1use linkspace_core::query::{WatchID, WatchIDRef};
7use point::lk_linkpoint_ref;
8use query::LkQuery;
9
10use crate::*;
33pub const STATUS_PATH_PREFIX: LkPathArray = lkp::lkpath(&[b"+status"]);
34
35#[derive(Clone)]
36#[repr(C)]
37pub struct LkcLocalStatusArgs {
39 pub domain: Domain,
41 pub group: GroupID,
43 pub objtype: Vec<u8>,
45 pub instance: Option<Vec<u8>>,
47 pub watch: WatchID,
49}
50impl Default for LkcLocalStatusArgs {
51 fn default() -> Self {
52 Self {
53 domain: crate::work_env::lke_domain(),
54 group: crate::work_env::lke_group(),
55 objtype: vec![],
56 instance: None,
57 watch: b"+status".to_vec(),
58 }
59 }
60}
61impl std::fmt::Debug for LkcLocalStatusArgs {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("LkStatus")
64 .field("domain", &self.domain)
65 .field("group", &self.group)
66 .field("objtype", &AB(&self.objtype))
67 .field("instance", &self.instance.as_ref().map(AB))
68 .finish()
69 }
70}
71#[doc(hidden)]
72pub fn lkc_lstatus_path(status: &LkcLocalStatusArgs) -> LkResult<LkPathArray> {
73 let mut space = STATUS_PATH_PREFIX;
74 space.push(&*status.group)?;
75 space.push(&status.objtype)?;
76 if let Some(v) = status.instance.as_ref() {
77 space.push(v)?;
78 }
79 Ok(space)
80}
81
82pub fn lkc_lstatus_request(status: &LkcLocalStatusArgs) -> LkResult<PointBox> {
84 lk_linkpoint_ref(
85 status.domain,
86 PRIVATE,
87 &lkc_lstatus_path(status)?,
88 &[],
89 &[],
90 lkp::now(),
91 )
92 .map(|p| p.as_box())
93}
94
95pub fn lkc_lstatus_overwatch(status: &LkcLocalStatusArgs, max_age: Stamp) -> LkResult<LkQuery> {
97 let LkcLocalStatusArgs { domain, .. } = status;
98 let space = lkc_lstatus_path(status)?;
99 let create_after = now().saturating_sub(max_age);
100 let mut q = query::LK_Q.clone();
101 lkq_insert_mut(&mut q, "group", "=", &*PRIVATE)?;
102 lkq_insert_mut(&mut q, "domain", "=", &**domain)?;
103 lkq_insert_mut(&mut q, "stamp", ">", &*create_after)?;
104 lkq_insert_mut(&mut q, "prefix", "=", space.as_bytes())?;
105 Ok(q)
106}
107
108#[cfg(any(feature = "lmdb", feature = "inmem"))]
109pub fn lkc_lstatus_watch(
111 status: &LkcLocalStatusArgs,
112 d_timeout: Stamp,
113 mut cb: impl crate::system::cb::PointHandler + 'static,
114) -> LkResult<bool> {
115 use crate::system::{lks_scan, lks_tap2};
116 use linkspace_core::prelude::U16;
117 use tracing::debug_span;
118
119 let span = debug_span!("status_poll", ?status, ?d_timeout);
120 let _ = span.enter();
121 let mut ok = false;
122 let mut last_request = Stamp::ZERO;
123 let mut query: LkQuery = lkc_lstatus_overwatch(status, d_timeout)?;
124 lks_scan(&query, &mut |point| {
126 if point.get_links().is_empty() && point.data().is_empty() {
127 last_request = *point.get_stamp();
128 tracing::debug!(point=%PointFmt(&point),"recently requested");
129 false
130 } else {
131 ok = true;
132 let cnt = (cb).on_point(point, Some(&status.watch));
133 tracing::debug!("recently replied");
134 cnt.is_break()
135 }
136 })?;
137 if last_request == Stamp::ZERO {
138 tracing::debug!("creating new req");
139 let req = lkc_lstatus_request(status).unwrap();
140 last_request = *req.get_stamp();
141 lks_save(&req)?;
142 }
143 let wait_until = last_request.saturating_add(d_timeout);
144 tracing::debug!(?wait_until, "Waiting until");
145 lkq_insert_mut(&mut query, "data_size", ">", &*U16::ZERO)?;
146 lkq_insert_mut(&mut query, "links_len", ">", &*U16::ZERO)?;
147 lkq_insert_mut(&mut query, "recv", "<", &*wait_until)?;
148 lkq_insert_mut(&mut query, "", "scan/0", &[])?;
149 lkq_insert_mut(&mut query, "", "watch", &status.watch)?;
150 lks_tap2(query, cb, span)?;
151 Ok(ok)
152}
153
154fn is_status_reply(status: &LkcLocalStatusArgs, rspace: &LkPath, point: &PointPtr) -> LkResult<()> {
155 anyhow::ensure!(
156 *point.get_domain() == status.domain
157 && *point.get_group() == PRIVATE
158 && point.get_path() == rspace
159 && !point.get_links().is_empty()
160 && !point.data().is_empty(),
161 "invalid status update"
162 );
163 Ok(())
164}
165
166#[cfg(any(feature = "lmdb", feature = "inmem"))]
168pub fn lkc_lstatus_set(
169 status: LkcLocalStatusArgs,
170 mut update: impl FnMut(Domain, GroupID, &LkPath, Link) -> LkResult<PointBox> + 'static,
171) -> LkResult<()> {
172 use crate::system::lks_tap2;
173 use lkp::U16;
174 use tracing::debug_span;
175
176 let span = debug_span!("status_set", ?status);
177 let _ = span.enter();
178
179 let status_path = lkc_lstatus_path(&status)?;
180 let link = Link {
181 tag: ab(b"init-status*"),
182 ptr: PRIVATE,
183 };
184 let initpoint = update(status.domain, PRIVATE, &status_path, link)?;
185 is_status_reply(&status, &status_path, &initpoint)?;
186 let mut prev = initpoint.hash();
187 tracing::debug!(?initpoint, "init status");
188 lks_save(&initpoint)?;
189 std::mem::drop(initpoint);
190
191 let mut q = query::LK_Q.clone();
192 let prefix = lkc_lstatus_path(&LkcLocalStatusArgs {
193 instance: None,
194 ..status.clone()
195 })?;
196 lkq_insert_mut(&mut q, "data_size", "=", &*U16::ZERO)?;
197 lkq_insert_mut(&mut q, "links_len", "=", &*U16::ZERO)?;
198 lkq_insert_mut(&mut q, "prefix", "=", prefix.as_bytes())?;
199 lkq_insert_mut(&mut q, "", "scan/0", b"")?;
201 lkq_insert_mut(&mut q, "", "watch", &status.watch)?;
202 lks_tap2(
203 q,
204 move |point: &dyn Point, _watch: Option<&WatchIDRef>| -> LkResult<()> {
205 let p = point.get_path();
206
207 if *p.nparts() == *status_path.nparts() && p.segments() != status_path.segments() {
208 return Ok(());
209 }
210
211 let link = Link {
212 tag: ab(b"prev-status*"),
213 ptr: prev,
214 };
215 let reply = update(status.domain, PRIVATE, &status_path, link)?;
216 is_status_reply(&status, &status_path, &reply)?;
217 prev = reply.hash();
218 tracing::debug!(?reply, "Reply status");
219 lks_save(&reply)?;
220 Ok(())
221 },
222 span,
223 )?;
224 Ok(())
225}