linkspace/commons/
stream.rs

1use std::mem::size_of;
2
3use lkp::{MAX_DATA_SIZE, NO_PTR, calc_free_space};
4use point::{lk_datapoint_ref, lk_linkpoint_ref};
5use system::lks_scan_hashes;
6
7use crate::prelude::*;
8pub fn lkc_stream_size(head: &dyn Point) -> u64 {
9    match head
10        .get_links()
11        .iter()
12        .rfind(|p| p.tag.starts_with(b"+stream"))
13    {
14        None => {
15            if head.is_datapoint() {
16                head.data().len() as u64
17            } else {
18                0
19            }
20        }
21        Some(l) => u64::from_be_bytes(l.tag.split::<0, 8>().1),
22    }
23}
24
25pub fn lkc_stream_in(
26    stream: &[&[u8]],
27    key: Option<&LkIdentity>,
28    meta_data: &[u8],
29    space: &dyn TryAsSpace,
30    links: &[Link],
31    stamp: Stamp,
32    out: &mut dyn FnMut(&mut dyn NewPoint) -> LkResult,
33) -> LkResult {
34    let space = space.try_as_space(&())?;
35
36    let free = calc_free_space(key.is_some(), meta_data.len(), &space.path, links.len());
37    let free = free.max(0) as usize;
38    anyhow::ensure!(free > std::mem::size_of::<Link>(), "too much metadata");
39    let stream_len: usize = stream.iter().map(|i| i.len()).sum();
40
41    let mut hashes =
42        Vec::with_capacity(stream_len / size_of::<Link>() + 1 + (free / size_of::<Link>() + 1));
43
44    for (i, chunk) in stream
45        .iter()
46        .flat_map(|b| b.chunks(MAX_DATA_SIZE))
47        .enumerate()
48    {
49        let mut point = lk_datapoint_ref(chunk).unwrap();
50        // we add the end_offset instead of its starts - so when reading we know the exact size of the result beforehand.
51        let end_offset = i as u64 * (MAX_DATA_SIZE as u64) + chunk.len() as u64;
52        out(&mut point)?;
53        hashes.push(Link {
54            tag: AB::try_join(b"+streamp", &end_offset.to_be_bytes() as &[u8]).unwrap(),
55            ptr: point.hash(),
56        });
57    }
58
59    let streampath = lkpath(&[b"+streams"]);
60    while hashes.len() * size_of::<Link>() > free {
61        let stream_list_free = calc_free_space(false, 0, &streampath, 0) as usize;
62        let split_off = (stream_list_free / size_of::<Link>()).min(hashes.len());
63        let start = hashes.len() - split_off;
64        let end_offset = hashes[start].tag.split::<0, 8>().1;
65        let stream_list: Vec<Link> = hashes
66            .splice(
67                start..,
68                Some(Link {
69                    tag: AB::try_join(b"+streaml", &end_offset as &[u8]).unwrap(),
70                    ptr: NO_PTR,
71                }),
72            )
73            .collect();
74        let mut point = lk_linkpoint_ref(
75            space.domain,
76            space.group,
77            &streampath,
78            &[],
79            &stream_list,
80            stamp,
81        )?;
82        out(&mut point)?;
83        hashes[start].ptr = point.hash()
84    }
85
86    let links = &[links, &hashes].concat();
87    let mut point = point::lk_point_ref(
88        key,
89        space.domain,
90        space.group,
91        &space.path,
92        meta_data,
93        links,
94        stamp,
95    )?;
96    out(&mut point)
97}
98
99pub type OnBytes<'o> = &'o mut dyn FnMut(u64, &[u8], &Link) -> ShouldBreak;
100pub type OnMissing<'o> = &'o mut dyn FnMut(&Link, &dyn Point) -> ShouldBreak;
101
102/// returns true if complete.
103pub fn lkc_stream_out(
104    parent: &dyn Point,
105    on_bytes: &mut dyn FnMut(u64, &[u8], &Link) -> ShouldBreak,
106    on_missing: &mut dyn FnMut(&Link, &dyn Point) -> ShouldBreak,
107    range: Option<(u64, u64)>,
108) -> bool {
109    if let Some((start, end)) = range {
110        if start > end {
111            return true;
112        };
113    }
114    if parent.is_datapoint() {
115        let (bytes, start_offset) = take_range(parent.data().len() as u64, parent.data(), range);
116        let fake_link = Link {
117            tag: AB::try_join(b"+streamp", &(bytes.len() as u64).to_be_bytes() as &[u8]).unwrap(),
118            ptr: parent.hash(),
119        };
120
121        on_bytes(start_offset, bytes, &fake_link);
122        return true;
123    }
124
125    let mut streamlinks = parent
126        .get_links()
127        .iter()
128        .filter(|i| i.tag.starts_with(b"+streamp") || i.tag.starts_with(b"+streaml"))
129        .filter(|l| {
130            range
131                .map(|(s, e)| (s..e).contains(&u64::from_be_bytes(l.tag.split::<0, 8>().1)))
132                .unwrap_or(true)
133        });
134
135    let called = lks_scan_hashes(&mut streamlinks, &mut |result, link| {
136        let (kind, end_offsetb) = link.tag.split::<8, 8>();
137        let end_offset = u64::from_be_bytes(end_offsetb);
138        let is_sub_stream = kind.starts_with(b"+streaml");
139        match result {
140            Some(point) if is_sub_stream => lkc_stream_out(point, on_bytes, on_missing, range),
141            Some(point) => {
142                let (bytes, start_offset) = take_range(end_offset, point.data(), range);
143                on_bytes(start_offset, bytes, link)
144            }
145            None => on_missing(link, parent),
146        }
147    })
148    .unwrap();
149    called < 0
150}
151
152#[test]
153fn take_range_test() {
154    assert_eq!(take_range(100, &[0; 10], None), (&[0u8; 10] as &[u8], 90));
155    assert_eq!(
156        take_range(100, &[0; 10], Some((0, 95))),
157        (&[0u8; 5] as &[u8], 90)
158    );
159    assert_eq!(
160        take_range(100, &[0; 10], Some((91, 95))),
161        (&[0u8; 4] as &[u8], 91)
162    );
163    assert_eq!(
164        take_range(100, &[0; 10], Some((91, 100))),
165        (&[0u8; 9] as &[u8], 91)
166    );
167    assert_eq!(
168        take_range(100, &[0; 10], Some((91, 110))),
169        (&[0u8; 9] as &[u8], 91)
170    );
171    assert_eq!(
172        take_range(100, &[0; 10], Some((80, 110))),
173        (&[0u8; 10] as &[u8], 90)
174    );
175
176    assert_eq!(
177        take_range(100, &[0; 10], Some((110, 120))),
178        (&[] as &[u8], 110)
179    );
180    assert_eq!(
181        take_range(4, &[0, 1, 2, 3, 4, 5], None),
182        (&[2u8, 3, 4, 5] as &[u8], 0)
183    );
184    assert_eq!(
185        take_range(4, &[0, 1, 2, 3, 4, 5], Some((0, 1))),
186        (&[2u8] as &[u8], 0)
187    );
188    assert_eq!(take_range(100, &[0; 10], Some((0, 80))), (&[] as &[u8], 90));
189}
190
191/// take a data that ends at data_end_offset, returns the data in range and the start_offset
192fn take_range(data_end_offset: u64, data: &[u8], range: Option<(u64, u64)>) -> (&[u8], u64) {
193    let (data, start_offset) = match data_end_offset.checked_sub(data.len() as u64) {
194        Some(v) => (data, v),
195        None => (&data[data.len() - data_end_offset as usize..], 0),
196    };
197    match range {
198        None => (data, start_offset),
199        Some(sel_range) => {
200            let data_range = (
201                data_end_offset.saturating_sub(data.len() as u64),
202                data_end_offset,
203            );
204            let effective_range = (data_range.0.max(sel_range.0), data_range.1.min(sel_range.1));
205            let in_buf_erange = (
206                effective_range.0.saturating_sub(data_range.0) as usize,
207                effective_range.1.saturating_sub(data_range.0) as usize,
208            );
209            (
210                data.get(in_buf_erange.0..in_buf_erange.1).unwrap_or(&[]),
211                effective_range.0,
212            )
213        }
214    }
215}
216
217use crate::system::cb::PointHandler;
218pub fn lkc_stream_auto(
219    _point: &dyn Point,
220    _cb: impl PointHandler + 'static,
221    _pull: bool,
222) -> LkResult {
223    Ok(())
224}