linkspace/experimental/
tap_hashes.rs

1use crate::prelude::*;
2
3pub fn lks_scan_links(
4    links: &[Link],
5    cb: &mut dyn FnMut(&dyn Point, &Link) -> ShouldBreak, // HashTapPoint
6) -> LkResult<Vec<Link>> {
7    let mut missing = vec![];
8    system::lks_scan_hashes(
9        &mut links.iter(),
10        &mut |point: Option<&dyn Point>, link: &Link| match point {
11            Some(p) => cb(p, link),
12            None => {
13                missing.push(*link);
14                false
15            }
16        },
17    )?;
18    Ok(missing)
19}
20
21use crate::system::cb::*;
22pub fn lks_tap_links(
23    links: &[Link],
24    mut cb: impl PointHandler + 'static,
25    span: Option<tracing::Span>,
26) -> LkResult<Vec<Link>> {
27    let missing = lks_scan_links(links, &mut |point, link| {
28        cb.on_point(point, Some(link.as_bytes())).is_break()
29    })?;
30    let missing_qs = missing
31        .iter()
32        .map(|link| {
33            let mut q = crate::query::lkq_hash(link.ptr, &()).unwrap();
34            lkq_insert_mut(&mut q, "", ":watch", link.as_bytes()).unwrap();
35            q
36        })
37        .collect();
38    system::lks_multi_tap(
39        missing_qs,
40        cb,
41        false,
42        span.unwrap_or_else(|| tracing::debug_span!("tap_point", ?links)),
43    )?;
44    Ok(missing)
45}
46
47pub type OnMissing = Box<dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak>;
48/// First does a scan_tree and saves all missing hashes
49/// Then starts watching for any hash that was missing, and recursivly doing a scan_tree + saving missing hashes whenever a match is found
50/// note that on_missing is called per point, but PointHandler.on_point is only called once even it is linked multiple times
51pub fn lks_tap_tree<C: PointHandler + 'static>(
52    point: &dyn Point,
53    mut cb: C,
54    mut on_missing: Option<OnMissing>,
55    span: Option<tracing::Span>,
56) -> LkResult<i64> {
57    use super::tree::*;
58    use crate::lkp::LkHashMap;
59    use linkspace_system::handlers::*;
60    let mut entries = LkHashMap::default();
61    let mut initialscan = 0;
62    let mut pending = 0;
63    let span = span.unwrap_or_else(|| tracing::debug_span!("tap_tree", ?point));
64    {
65        let _g = span.enter();
66        lks_scan_tree(
67            point,
68            OnVisit {
69                on_enter: &mut |point, link| match point {
70                    None => {
71                        if entries.insert(link.ptr, false).is_none() {
72                            pending += 1;
73                        }
74                        false
75                    }
76                    Some(p) => {
77                        initialscan += 1;
78                        if entries.insert(link.ptr, true).is_none() {
79                            cb.on_point(p, None).is_break()
80                        } else {
81                            false
82                        }
83                    }
84                },
85                on_exit: &mut |_, _| false,
86                on_missing: &mut |p, l| on_missing.as_mut().map(|fnc| fnc(p, l)).unwrap_or(false),
87                select: &mut select_all,
88            },
89        );
90    }
91    // Create an empty watch query that scans all new points.
92    let q = crate::query::lkq_watch(
93        crate::query::LK_Q.clone(),
94        &[b"lks_tap_tree/" as &[u8], &point.hash_ref().0].concat(),
95    );
96
97    if pending == 0 {
98        cb.on_close(&q, StopReason::Finish, initialscan, 0);
99        return Ok(initialscan as i64);
100    }
101
102    struct TreeHandler<C> {
103        // key for each set to true if already seen
104        entries: LkHashMap<bool>,
105        pending: usize,
106        initialscan: u64,
107        called_during_watch: u64,
108        finish: bool,
109        #[allow(clippy::type_complexity)]
110        on_missing: Option<Box<dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak>>,
111        cb: C,
112    }
113
114    impl<C: PointHandler> PointHandler for TreeHandler<C> {
115        fn on_point(&mut self, point: &dyn Point, watch: Option<&[u8]>) -> ControlFlow<()> {
116            if let Some(seen) = self.entries.get_mut(point.hash_ref()) {
117                if *seen {
118                    return ControlFlow::Break(());
119                };
120                *seen = true;
121                self.called_during_watch += 1;
122                self.pending -= 1;
123                self.cb.on_point(point, watch)?;
124
125                let on_missing: &mut dyn FnMut(&dyn Point, Vec<Link>) -> ShouldBreak =
126                    &mut |p, l| {
127                        self.on_missing
128                            .as_mut()
129                            .map(|fnc| fnc(p, l))
130                            .unwrap_or(false)
131                    };
132
133                let r = lks_scan_tree(
134                    point,
135                    OnVisit {
136                        on_enter: &mut |point, link| match point {
137                            None => {
138                                match self.entries.insert(link.ptr, false) {
139                                    Some(true) => tracing::error!(
140                                        "Undefined Behaviour - lks_tap_tree - previously known point has been removed from tree"
141                                    ),
142                                    Some(false) => {}
143                                    None => self.pending += 1,
144                                }
145                                false
146                            }
147                            Some(p) => {
148                                match self.entries.insert(link.ptr, true) {
149                                    Some(true) => return false,
150                                    Some(false) => {
151                                        self.pending -= 1;
152                                    }
153                                    None => {}
154                                }
155                                self.called_during_watch += 1;
156                                self.cb.on_point(p, None).is_break()
157                            }
158                        },
159                        on_exit: &mut |_, _| false,
160                        on_missing,
161                        select: &mut select_all,
162                    },
163                );
164                if r {
165                    return ControlFlow::Break(());
166                }
167            }
168            if self.pending == 0 {
169                self.finish = true;
170                return ControlFlow::Break(());
171            }
172            ControlFlow::Continue(())
173        }
174        fn on_close(&mut self, query: &LkQuery, reason: StopReason, _total: u64, _watched: u64) {
175            let reason = if self.finish {
176                StopReason::Finish
177            } else {
178                reason
179            };
180            self.cb.on_close(
181                query,
182                reason,
183                self.initialscan + self.called_during_watch,
184                self.called_during_watch,
185            )
186        }
187    }
188
189    crate::system::lks_tap2(
190        q,
191        TreeHandler {
192            entries,
193            cb,
194            finish: false,
195            called_during_watch: 0,
196            initialscan,
197            pending,
198            on_missing,
199        },
200        span,
201    )?;
202    Ok(-(initialscan as i64))
203}