hydro_lang/
runtime_context.rs

1use dfir_rs::scheduled::context::Context;
2use quote::quote;
3use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
4
5use crate::Location;
6
7pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() };
8
9#[derive(Clone, Copy)]
10pub struct RuntimeContext<'a> {
11    _private: &'a (),
12}
13
14impl<'a, L: Location<'a>> FreeVariableWithContext<L> for RuntimeContext<'a> {
15    type O = &'a Context;
16
17    fn to_tokens(self, _ctx: &L) -> QuoteTokens {
18        QuoteTokens {
19            prelude: None,
20            expr: Some(quote!(&context)),
21        }
22    }
23}
24
25#[cfg(test)]
26mod tests {
27    use futures::StreamExt;
28    use hydro_deploy::Deployment;
29
30    use crate::*;
31
32    struct P1 {}
33
34    #[tokio::test]
35    async fn runtime_context() {
36        let mut deployment = Deployment::new();
37
38        let flow = FlowBuilder::new();
39        let node = flow.process::<P1>();
40        let external = flow.external_process::<()>();
41
42        let out_port = node
43            .source_iter(q!(0..5))
44            .map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0)))
45            .send_bincode_external(&external);
46
47        let nodes = flow
48            .with_process(&node, deployment.Localhost())
49            .with_external(&external, deployment.Localhost())
50            .deploy(&mut deployment);
51
52        deployment.deploy().await.unwrap();
53
54        let mut external_out = nodes.connect_source_bincode(out_port).await;
55
56        deployment.start().await.unwrap();
57
58        for i in 0..5 {
59            assert_eq!(external_out.next().await.unwrap(), (i, 0));
60        }
61    }
62}