1use crate::prelude::*;
2
3pub fn lks_scan_links(
4 links: &[Link],
5 cb: &mut dyn FnMut(&dyn Point, &Link) -> ShouldBreak, ) -> LkResult<Vec<Link>> {
7 let mut missing = vec![];
8 system::lks_scan_hashes(
9 &mut links.iter(),
10 &mut |point: Option<&dyn Point>, link: &Link| match point {
11 Some(p) => cb(p, link),
12 None => {
13 missing.push(*link);
14 false
15 }
16 },
17 )?;
18 Ok(missing)
19}
20
21use crate::system::cb::*;
22pub fn lks_tap_links(
23 links: &[Link],
24 mut cb: impl PointHandler + 'static,
25 span: Option<tracing::Span>,
26) -> LkResult<Vec<Link>> {
27 let missing = lks_scan_links(links, &mut |point, link| {
28 cb.on_point(point, Some(link.as_bytes())).is_break()
29 })?;
30 let missing_qs = missing
31 .iter()
32 .map(|link| {
33 let mut q = crate::query::lkq_hash(link.ptr, &()).unwrap();
34 lkq_insert_mut(&mut q, "", ":watch", link.as_bytes()).unwrap();
35 q
36 })
37 .collect();
38 system::lks_multi_tap(
39 missing_qs,
40 cb,
41 false,
42 span.unwrap_or_else(|| tracing::debug_span!("tap_point", ?links)),
43 )?;
44 Ok(missing)
45}
46
47pub type OnMissing = Box<dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak>;
48pub fn lks_tap_tree<C: PointHandler + 'static>(
52 point: &dyn Point,
53 mut cb: C,
54 mut on_missing: Option<OnMissing>,
55 span: Option<tracing::Span>,
56) -> LkResult<i64> {
57 use super::tree::*;
58 use crate::lkp::LkHashMap;
59 use linkspace_system::handlers::*;
60 let mut entries = LkHashMap::default();
61 let mut initialscan = 0;
62 let mut pending = 0;
63 let span = span.unwrap_or_else(|| tracing::debug_span!("tap_tree", ?point));
64 {
65 let _g = span.enter();
66 lks_scan_tree(
67 point,
68 OnVisit {
69 on_enter: &mut |point, link| match point {
70 None => {
71 if entries.insert(link.ptr, false).is_none() {
72 pending += 1;
73 }
74 false
75 }
76 Some(p) => {
77 initialscan += 1;
78 if entries.insert(link.ptr, true).is_none() {
79 cb.on_point(p, None).is_break()
80 } else {
81 false
82 }
83 }
84 },
85 on_exit: &mut |_, _| false,
86 on_missing: &mut |p, l| on_missing.as_mut().map(|fnc| fnc(p, l)).unwrap_or(false),
87 select: &mut select_all,
88 },
89 );
90 }
91 let q = crate::query::lkq_watch(
93 crate::query::LK_Q.clone(),
94 &[b"lks_tap_tree/" as &[u8], &point.hash_ref().0].concat(),
95 );
96
97 if pending == 0 {
98 cb.on_close(&q, StopReason::Finish, initialscan, 0);
99 return Ok(initialscan as i64);
100 }
101
102 struct TreeHandler<C> {
103 entries: LkHashMap<bool>,
105 pending: usize,
106 initialscan: u64,
107 called_during_watch: u64,
108 finish: bool,
109 #[allow(clippy::type_complexity)]
110 on_missing: Option<Box<dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak>>,
111 cb: C,
112 }
113
114 impl<C: PointHandler> PointHandler for TreeHandler<C> {
115 fn on_point(&mut self, point: &dyn Point, watch: Option<&[u8]>) -> ControlFlow<()> {
116 if let Some(seen) = self.entries.get_mut(point.hash_ref()) {
117 if *seen {
118 return ControlFlow::Break(());
119 };
120 *seen = true;
121 self.called_during_watch += 1;
122 self.pending -= 1;
123 self.cb.on_point(point, watch)?;
124
125 let on_missing: &mut dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak =
126 &mut |p, l| {
127 self.on_missing
128 .as_mut()
129 .map(|fnc| fnc(p, l))
130 .unwrap_or(false)
131 };
132
133 let r = lks_scan_tree(
134 point,
135 OnVisit {
136 on_enter: &mut |point, link| match point {
137 None => {
138 match self.entries.insert(link.ptr, false) {
139 Some(true) => tracing::error!(
140 "Undefined Behaviour - lks_tap_tree - previously known point has been removed from tree"
141 ),
142 Some(false) => {}
143 None => self.pending += 1,
144 }
145 false
146 }
147 Some(p) => {
148 match self.entries.insert(link.ptr, true) {
149 Some(true) => return false,
150 Some(false) => {
151 self.pending -= 1;
152 }
153 None => {}
154 }
155 self.called_during_watch += 1;
156 self.cb.on_point(p, None).is_break()
157 }
158 },
159 on_exit: &mut |_, _| false,
160 on_missing,
161 select: &mut select_all,
162 },
163 );
164 if r {
165 return ControlFlow::Break(());
166 }
167 }
168 if self.pending == 0 {
169 self.finish = true;
170 return ControlFlow::Break(());
171 }
172 ControlFlow::Continue(())
173 }
174 fn on_close(&mut self, query: &LkQuery, reason: StopReason, _total: u64, _watched: u64) {
175 let reason = if self.finish {
176 StopReason::Finish
177 } else {
178 reason
179 };
180 self.cb.on_close(
181 query,
182 reason,
183 self.initialscan + self.called_during_watch,
184 self.called_during_watch,
185 )
186 }
187 }
188
189 crate::system::lks_tap2(
190 q,
191 TreeHandler {
192 entries,
193 cb,
194 finish: false,
195 called_during_watch: 0,
196 initialscan,
197 pending,
198 on_missing,
199 },
200 span,
201 )?;
202 Ok(-(initialscan as i64))
203}