1use std::mem::size_of;
2
3use lkp::{MAX_DATA_SIZE, NO_PTR, calc_free_space};
4use point::{lk_datapoint_ref, lk_linkpoint_ref};
5use system::lks_scan_hashes;
6
7use crate::prelude::*;
8pub fn lkc_stream_size(head: &dyn Point) -> u64 {
9 match head
10 .get_links()
11 .iter()
12 .rfind(|p| p.tag.starts_with(b"+stream"))
13 {
14 None => {
15 if head.is_datapoint() {
16 head.data().len() as u64
17 } else {
18 0
19 }
20 }
21 Some(l) => u64::from_be_bytes(l.tag.split::<0, 8>().1),
22 }
23}
24
25pub fn lkc_stream_in(
26 stream: &[&[u8]],
27 key: Option<&LkIdentity>,
28 meta_data: &[u8],
29 space: &dyn TryAsSpace,
30 links: &[Link],
31 stamp: Stamp,
32 out: &mut dyn FnMut(&mut dyn NewPoint) -> LkResult,
33) -> LkResult {
34 let space = space.try_as_space(&())?;
35
36 let free = calc_free_space(key.is_some(), meta_data.len(), &space.path, links.len());
37 let free = free.max(0) as usize;
38 anyhow::ensure!(free > std::mem::size_of::<Link>(), "too much metadata");
39 let stream_len: usize = stream.iter().map(|i| i.len()).sum();
40
41 let mut hashes =
42 Vec::with_capacity(stream_len / size_of::<Link>() + 1 + (free / size_of::<Link>() + 1));
43
44 for (i, chunk) in stream
45 .iter()
46 .flat_map(|b| b.chunks(MAX_DATA_SIZE))
47 .enumerate()
48 {
49 let mut point = lk_datapoint_ref(chunk).unwrap();
50 let end_offset = i as u64 * (MAX_DATA_SIZE as u64) + chunk.len() as u64;
52 out(&mut point)?;
53 hashes.push(Link {
54 tag: AB::try_join(b"+streamp", &end_offset.to_be_bytes() as &[u8]).unwrap(),
55 ptr: point.hash(),
56 });
57 }
58
59 let streampath = lkpath(&[b"+streams"]);
60 while hashes.len() * size_of::<Link>() > free {
61 let stream_list_free = calc_free_space(false, 0, &streampath, 0) as usize;
62 let split_off = (stream_list_free / size_of::<Link>()).min(hashes.len());
63 let start = hashes.len() - split_off;
64 let end_offset = hashes[start].tag.split::<0, 8>().1;
65 let stream_list: Vec<Link> = hashes
66 .splice(
67 start..,
68 Some(Link {
69 tag: AB::try_join(b"+streaml", &end_offset as &[u8]).unwrap(),
70 ptr: NO_PTR,
71 }),
72 )
73 .collect();
74 let mut point = lk_linkpoint_ref(
75 space.domain,
76 space.group,
77 &streampath,
78 &[],
79 &stream_list,
80 stamp,
81 )?;
82 out(&mut point)?;
83 hashes[start].ptr = point.hash()
84 }
85
86 let links = &[links, &hashes].concat();
87 let mut point = point::lk_point_ref(
88 key,
89 space.domain,
90 space.group,
91 &space.path,
92 meta_data,
93 links,
94 stamp,
95 )?;
96 out(&mut point)
97}
98
99pub type OnBytes<'o> = &'o mut dyn FnMut(u64, &[u8], &Link) -> ShouldBreak;
100pub type OnMissing<'o> = &'o mut dyn FnMut(&Link, &dyn Point) -> ShouldBreak;
101
102pub fn lkc_stream_out(
104 parent: &dyn Point,
105 on_bytes: &mut dyn FnMut(u64, &[u8], &Link) -> ShouldBreak,
106 on_missing: &mut dyn FnMut(&Link, &dyn Point) -> ShouldBreak,
107 range: Option<(u64, u64)>,
108) -> bool {
109 if let Some((start, end)) = range {
110 if start > end {
111 return true;
112 };
113 }
114 if parent.is_datapoint() {
115 let (bytes, start_offset) = take_range(parent.data().len() as u64, parent.data(), range);
116 let fake_link = Link {
117 tag: AB::try_join(b"+streamp", &(bytes.len() as u64).to_be_bytes() as &[u8]).unwrap(),
118 ptr: parent.hash(),
119 };
120
121 on_bytes(start_offset, bytes, &fake_link);
122 return true;
123 }
124
125 let mut streamlinks = parent
126 .get_links()
127 .iter()
128 .filter(|i| i.tag.starts_with(b"+streamp") || i.tag.starts_with(b"+streaml"))
129 .filter(|l| {
130 range
131 .map(|(s, e)| (s..e).contains(&u64::from_be_bytes(l.tag.split::<0, 8>().1)))
132 .unwrap_or(true)
133 });
134
135 let called = lks_scan_hashes(&mut streamlinks, &mut |result, link| {
136 let (kind, end_offsetb) = link.tag.split::<8, 8>();
137 let end_offset = u64::from_be_bytes(end_offsetb);
138 let is_sub_stream = kind.starts_with(b"+streaml");
139 match result {
140 Some(point) if is_sub_stream => lkc_stream_out(point, on_bytes, on_missing, range),
141 Some(point) => {
142 let (bytes, start_offset) = take_range(end_offset, point.data(), range);
143 on_bytes(start_offset, bytes, link)
144 }
145 None => on_missing(link, parent),
146 }
147 })
148 .unwrap();
149 called < 0
150}
151
152#[test]
153fn take_range_test() {
154 assert_eq!(take_range(100, &[0; 10], None), (&[0u8; 10] as &[u8], 90));
155 assert_eq!(
156 take_range(100, &[0; 10], Some((0, 95))),
157 (&[0u8; 5] as &[u8], 90)
158 );
159 assert_eq!(
160 take_range(100, &[0; 10], Some((91, 95))),
161 (&[0u8; 4] as &[u8], 91)
162 );
163 assert_eq!(
164 take_range(100, &[0; 10], Some((91, 100))),
165 (&[0u8; 9] as &[u8], 91)
166 );
167 assert_eq!(
168 take_range(100, &[0; 10], Some((91, 110))),
169 (&[0u8; 9] as &[u8], 91)
170 );
171 assert_eq!(
172 take_range(100, &[0; 10], Some((80, 110))),
173 (&[0u8; 10] as &[u8], 90)
174 );
175
176 assert_eq!(
177 take_range(100, &[0; 10], Some((110, 120))),
178 (&[] as &[u8], 110)
179 );
180 assert_eq!(
181 take_range(4, &[0, 1, 2, 3, 4, 5], None),
182 (&[2u8, 3, 4, 5] as &[u8], 0)
183 );
184 assert_eq!(
185 take_range(4, &[0, 1, 2, 3, 4, 5], Some((0, 1))),
186 (&[2u8] as &[u8], 0)
187 );
188 assert_eq!(take_range(100, &[0; 10], Some((0, 80))), (&[] as &[u8], 90));
189}
190
191fn take_range(data_end_offset: u64, data: &[u8], range: Option<(u64, u64)>) -> (&[u8], u64) {
193 let (data, start_offset) = match data_end_offset.checked_sub(data.len() as u64) {
194 Some(v) => (data, v),
195 None => (&data[data.len() - data_end_offset as usize..], 0),
196 };
197 match range {
198 None => (data, start_offset),
199 Some(sel_range) => {
200 let data_range = (
201 data_end_offset.saturating_sub(data.len() as u64),
202 data_end_offset,
203 );
204 let effective_range = (data_range.0.max(sel_range.0), data_range.1.min(sel_range.1));
205 let in_buf_erange = (
206 effective_range.0.saturating_sub(data_range.0) as usize,
207 effective_range.1.saturating_sub(data_range.0) as usize,
208 );
209 (
210 data.get(in_buf_erange.0..in_buf_erange.1).unwrap_or(&[]),
211 effective_range.0,
212 )
213 }
214 }
215}
216
217use crate::system::cb::PointHandler;
218pub fn lkc_stream_auto(
219 _point: &dyn Point,
220 _cb: impl PointHandler + 'static,
221 _pull: bool,
222) -> LkResult {
223 Ok(())
224}