linkspace/commons/
pull.rs

1use crate::{point::lk_keypoint_ref, query::*, work_env::lke_domain, *};
2use anyhow::Context;
3use linkspace_core::prelude::EXCHANGE_DOMAIN;
4use point::lk_linkpoint_ref;
5fn domain(query: &LkQuery) -> LkResult<Domain> {
6    match lkq_exact_domain(query)? {
7        Some(k) => Ok(k),
8        None => Ok(crate::work_env::lke_domain()),
9    }
10}
11fn group(query: &LkQuery) -> LkResult<GroupID> {
12    match lkq_exact_group(query)? {
13        Some(g) => Ok(g),
14        None => Ok(crate::work_env::lke_group()),
15    }
16}
17
18/** Create a pull request point
19
20- \[f:exchange\]:\[#:0\]:/pull/group/\[query.group\]/domain/\[query.domain\]/\[query.watch_id\]
21
22A domain should be conservative with its query.
23Requesting too much can add significant overhead.
24
25It is up to an exchange process to fulfill the query.
26By convention the exchange process only responds with pull requests made while it was active.
27As an optimization, a exchange process might read the latest request for which query.domain == query.watch_id when a connection opens
28 **/
29pub fn lkc_pull(query: &LkQuery, add: &dyn Stmnts) -> LkResult<PointBox> {
30    tracing::debug!(?query, "Building pull");
31    let mut query = query.clone();
32    lkq_add_mut(&mut query, add, &())?;
33    let id = query.0.watch_id().context("missing :watch ID")?;
34    anyhow::ensure!(!id.is_empty(), "missing :watch ID");
35    lkc_pull_from(&query, Some(group(&query)?), domain(&query)?, Some(id))
36}
37
38pub fn lkc_get(query: &LkQuery) -> LkResult<PointBox> {
39    let mut query = query.clone();
40    query.0.watch_options = None;
41    lkc_pull_from(
42        &query,
43        Some(group(&query)?),
44        domain(&query)?,
45        Some(b":scan"),
46    )
47}
48
49pub fn lkc_pull_from(
50    query: &LkQuery,
51    group: Option<GroupID>,
52    domain: Domain,
53    id: Option<&[u8]>,
54) -> LkResult<PointBox> {
55    let data = query.0.to_string();
56    tracing::trace!(data);
57    let id = id.filter(|o| !o.is_empty()).unwrap_or(b"default");
58
59    let path = match &group {
60        Some(group) => lkpath(&[b"pull", b"group", &**group, b"domain", &*domain, id]),
61        None => lkpath(&[b"pull", b"domain", &*domain, id]),
62    };
63
64    let mut point = lk_linkpoint_ref(
65        EXCHANGE_DOMAIN,
66        PRIVATE,
67        &path,
68        data.as_bytes(),
69        &[],
70        lkp::now(),
71    )?;
72    point.update_xheader(&XFlags::REPLACABLE);
73    Ok(point.as_box())
74}
75
76/**
77FIXME: This is experimental function
78Creates pull requests designed to be send over a socket.
79Unlike lkc_pull:
80- Does not create a record in [#:0]
81- Signing is optional
82**/
83pub fn lkc_request(
84    group: GroupID,
85    query: &crate::LkQuery,
86    key: Option<&LkIdentity>,
87) -> LkResult<PointBox> {
88    let dom = lke_domain();
89    let path = lkpath(&[b"pull", b"group", &group.0, b"domain", &dom.0, b"request"]);
90
91    let data = query.0.to_string();
92    let mut p = match key {
93        Some(k) => lk_keypoint_ref(
94            &k,
95            EXCHANGE_DOMAIN,
96            group,
97            &path,
98            data.as_bytes(),
99            &[],
100            now(),
101        )?,
102        None => lk_linkpoint_ref(EXCHANGE_DOMAIN, group, &path, data.as_bytes(), &[], now())?,
103    };
104    p.update_xheader(&XFlags::DISPOSABLE);
105    Ok(p.as_box())
106}