linkspace/commons/
status.rs

1// Copyright Anton Sol
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6use linkspace_core::query::{WatchID, WatchIDRef};
7use point::lk_linkpoint_ref;
8use query::LkQuery;
9
10/**
11Status queries can communicate if a process exists that is handling a specific 'type' of events
12
13Note that this is only for local status updates.
14The group argument does not ask inside GROUP, it only signals which group the query is about.
15Other processes are meant to answer a request.
16
17A request is a packet in the form [wg]:[#:0]:/+status/GROUP/type(/instance?) , with no data and no links.
18A reply is of the form [wg]:[#:0]/+status/GROUP/type/instance with some data and at least some links.
19
20A request without 'instance' should be answered by all instances.
21
22The reply must have an 'instance' set. It defaults to 'default'.
23The reply data should be either "OK\n" or "ERR\n" followed by more info.
24The reply process links can start with init:[#:0] at first and should point to previous replies after that.
25
26A new request is not made if one was made after now-timeout.
27I.e. a process checks if a request was made since now-timeout, before making a new request, and returns after last_req+timeout.
28A reply is accepted if it was made now-timeout.
29
30This might change
31**/
32use crate::*;
33pub const STATUS_PATH_PREFIX: LkPathArray = lkp::lkpath(&[b"+status"]);
34
35#[derive(Clone)]
36#[repr(C)]
37/// Arguments for a local status request.
38pub struct LkcLocalStatusArgs {
39    /// the domain of interests
40    pub domain: Domain,
41    /// the group of interests (NOT the group to query, see mod level doc on the usage)
42    pub group: GroupID,
43    /// the object type of interests
44    pub objtype: Vec<u8>,
45    /// the instance
46    pub instance: Option<Vec<u8>>,
47    /// watch id to use - reuse of an id will remove the listening query
48    pub watch: WatchID,
49}
50impl Default for LkcLocalStatusArgs {
51    fn default() -> Self {
52        Self {
53            domain: crate::work_env::lke_domain(),
54            group: crate::work_env::lke_group(),
55            objtype: vec![],
56            instance: None,
57            watch: b"+status".to_vec(),
58        }
59    }
60}
61impl std::fmt::Debug for LkcLocalStatusArgs {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("LkStatus")
64            .field("domain", &self.domain)
65            .field("group", &self.group)
66            .field("objtype", &AB(&self.objtype))
67            .field("instance", &self.instance.as_ref().map(AB))
68            .finish()
69    }
70}
71#[doc(hidden)]
72pub fn lkc_lstatus_path(status: &LkcLocalStatusArgs) -> LkResult<LkPathArray> {
73    let mut space = STATUS_PATH_PREFIX;
74    space.push(&*status.group)?;
75    space.push(&status.objtype)?;
76    if let Some(v) = status.instance.as_ref() {
77        space.push(v)?;
78    }
79    Ok(space)
80}
81
82/// A query that returns both requests and updates
83pub fn lkc_lstatus_request(status: &LkcLocalStatusArgs) -> LkResult<PointBox> {
84    lk_linkpoint_ref(
85        status.domain,
86        PRIVATE,
87        &lkc_lstatus_path(status)?,
88        &[],
89        &[],
90        lkp::now(),
91    )
92    .map(|p| p.as_box())
93}
94
95/// A query that returns both requests and updates
96pub fn lkc_lstatus_overwatch(status: &LkcLocalStatusArgs, max_age: Stamp) -> LkResult<LkQuery> {
97    let LkcLocalStatusArgs { domain, .. } = status;
98    let space = lkc_lstatus_path(status)?;
99    let create_after = now().saturating_sub(max_age);
100    let mut q = query::LK_Q.clone();
101    lkq_insert_mut(&mut q, "group", "=", &*PRIVATE)?;
102    lkq_insert_mut(&mut q, "domain", "=", &**domain)?;
103    lkq_insert_mut(&mut q, "stamp", ">", &*create_after)?;
104    lkq_insert_mut(&mut q, "prefix", "=", space.as_bytes())?;
105    Ok(q)
106}
107
108#[cfg(any(feature = "lmdb", feature = "inmem"))]
109/// watch for any points matching
110pub fn lkc_lstatus_watch(
111    status: &LkcLocalStatusArgs,
112    d_timeout: Stamp,
113    mut cb: impl crate::system::cb::PointHandler + 'static,
114) -> LkResult<bool> {
115    use crate::system::{lks_scan, lks_tap2};
116    use linkspace_core::prelude::U16;
117    use tracing::debug_span;
118
119    let span = debug_span!("status_poll", ?status, ?d_timeout);
120    let _ = span.enter();
121    let mut ok = false;
122    let mut last_request = Stamp::ZERO;
123    let mut query: LkQuery = lkc_lstatus_overwatch(status, d_timeout)?;
124    // We want to capture any old request, so we first lks_scan_all both requests and replies.
125    lks_scan(&query, &mut |point| {
126        if point.get_links().is_empty() && point.data().is_empty() {
127            last_request = *point.get_stamp();
128            tracing::debug!(point=%PointFmt(&point),"recently requested");
129            false
130        } else {
131            ok = true;
132            let cnt = (cb).on_point(point, Some(&status.watch));
133            tracing::debug!("recently replied");
134            cnt.is_break()
135        }
136    })?;
137    if last_request == Stamp::ZERO {
138        tracing::debug!("creating new req");
139        let req = lkc_lstatus_request(status).unwrap();
140        last_request = *req.get_stamp();
141        lks_save(&req)?;
142    }
143    let wait_until = last_request.saturating_add(d_timeout);
144    tracing::debug!(?wait_until, "Waiting until");
145    lkq_insert_mut(&mut query, "data_size", ">", &*U16::ZERO)?;
146    lkq_insert_mut(&mut query, "links_len", ">", &*U16::ZERO)?;
147    lkq_insert_mut(&mut query, "recv", "<", &*wait_until)?;
148    lkq_insert_mut(&mut query, "", "scan/0", &[])?;
149    lkq_insert_mut(&mut query, "", "watch", &status.watch)?;
150    lks_tap2(query, cb, span)?;
151    Ok(ok)
152}
153
154fn is_status_reply(status: &LkcLocalStatusArgs, rspace: &LkPath, point: &PointPtr) -> LkResult<()> {
155    anyhow::ensure!(
156        *point.get_domain() == status.domain
157            && *point.get_group() == PRIVATE
158            && point.get_path() == rspace
159            && !point.get_links().is_empty()
160            && !point.data().is_empty(),
161        "invalid status update"
162    );
163    Ok(())
164}
165
166/// Insert a callback that is triggered on a request. Must yield a valid response.
167#[cfg(any(feature = "lmdb", feature = "inmem"))]
168pub fn lkc_lstatus_set(
169    status: LkcLocalStatusArgs,
170    mut update: impl FnMut(Domain, GroupID, &LkPath, Link) -> LkResult<PointBox> + 'static,
171) -> LkResult<()> {
172    use crate::system::lks_tap2;
173    use lkp::U16;
174    use tracing::debug_span;
175
176    let span = debug_span!("status_set", ?status);
177    let _ = span.enter();
178
179    let status_path = lkc_lstatus_path(&status)?;
180    let link = Link {
181        tag: ab(b"init-status*"),
182        ptr: PRIVATE,
183    };
184    let initpoint = update(status.domain, PRIVATE, &status_path, link)?;
185    is_status_reply(&status, &status_path, &initpoint)?;
186    let mut prev = initpoint.hash();
187    tracing::debug!(?initpoint, "init status");
188    lks_save(&initpoint)?;
189    std::mem::drop(initpoint);
190
191    let mut q = query::LK_Q.clone();
192    let prefix = lkc_lstatus_path(&LkcLocalStatusArgs {
193        instance: None,
194        ..status.clone()
195    })?;
196    lkq_insert_mut(&mut q, "data_size", "=", &*U16::ZERO)?;
197    lkq_insert_mut(&mut q, "links_len", "=", &*U16::ZERO)?;
198    lkq_insert_mut(&mut q, "prefix", "=", prefix.as_bytes())?;
199    // We only care about new packets. Worst case a request was received and timeout between our init and this cb.
200    lkq_insert_mut(&mut q, "", "scan/0", b"")?;
201    lkq_insert_mut(&mut q, "", "watch", &status.watch)?;
202    lks_tap2(
203        q,
204        move |point: &dyn Point, _watch: Option<&WatchIDRef>| -> LkResult<()> {
205            let p = point.get_path();
206
207            if *p.nparts() == *status_path.nparts() && p.segments() != status_path.segments() {
208                return Ok(());
209            }
210
211            let link = Link {
212                tag: ab(b"prev-status*"),
213                ptr: prev,
214            };
215            let reply = update(status.domain, PRIVATE, &status_path, link)?;
216            is_status_reply(&status, &status_path, &reply)?;
217            prev = reply.hash();
218            tracing::debug!(?reply, "Reply status");
219            lks_save(&reply)?;
220            Ok(())
221        },
222        span,
223    )?;
224    Ok(())
225}