hydro_lang/location/
mod.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::stream::Stream as FuturesStream;
6use proc_macro2::Span;
7use stageleft::{QuotedWithContext, q};
8
9use super::builder::FlowState;
10use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
11use crate::ir::{DebugType, HydroIrMetadata, HydroNode, HydroSource};
12use crate::{Singleton, Stream, Unbounded};
13
14pub mod external_process;
15pub use external_process::ExternalProcess;
16
17pub mod process;
18pub use process::Process;
19
20pub mod cluster;
21pub use cluster::{Cluster, ClusterId};
22
23pub mod can_send;
24pub use can_send::CanSend;
25
26pub mod tick;
27pub use tick::{Atomic, NoTick, Tick};
28
29#[derive(PartialEq, Eq, Clone, Debug, Hash)]
30pub enum LocationId {
31    Process(usize),
32    Cluster(usize),
33    Tick(usize, Box<LocationId>),
34    ExternalProcess(usize),
35}
36
37impl LocationId {
38    pub fn root(&self) -> &LocationId {
39        match self {
40            LocationId::Process(_) => self,
41            LocationId::Cluster(_) => self,
42            LocationId::Tick(_, id) => id.root(),
43            LocationId::ExternalProcess(_) => self,
44        }
45    }
46
47    pub fn raw_id(&self) -> usize {
48        match self {
49            LocationId::Process(id) => *id,
50            LocationId::Cluster(id) => *id,
51            LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
52            LocationId::ExternalProcess(id) => *id,
53        }
54    }
55}
56
57pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
58    assert_eq!(l1.id(), l2.id(), "locations do not match");
59}
60
61pub trait Location<'a>: Clone {
62    type Root: Location<'a>;
63
64    fn root(&self) -> Self::Root;
65
66    fn id(&self) -> LocationId;
67
68    fn flow_state(&self) -> &FlowState;
69
70    fn is_top_level() -> bool;
71
72    fn tick(&self) -> Tick<Self>
73    where
74        Self: NoTick,
75    {
76        let next_id = self.flow_state().borrow_mut().next_clock_id;
77        self.flow_state().borrow_mut().next_clock_id += 1;
78        Tick {
79            id: next_id,
80            l: self.clone(),
81        }
82    }
83
84    fn next_node_id(&self) -> usize {
85        let next_id = self.flow_state().borrow_mut().next_node_id;
86        self.flow_state().borrow_mut().next_node_id += 1;
87        next_id
88    }
89
90    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
91        HydroIrMetadata {
92            location_kind: self.id(),
93            output_type: Some(DebugType(stageleft::quote_type::<T>())),
94            cardinality: None,
95            cpu_usage: None,
96        }
97    }
98
99    fn spin(&self) -> Stream<(), Self, Unbounded>
100    where
101        Self: Sized + NoTick,
102    {
103        Stream::new(
104            self.clone(),
105            HydroNode::Persist {
106                inner: Box::new(HydroNode::Source {
107                    source: HydroSource::Spin(),
108                    location_kind: self.id(),
109                    metadata: self.new_node_metadata::<()>(),
110                }),
111                metadata: self.new_node_metadata::<()>(),
112            },
113        )
114    }
115
116    fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
117        &self,
118        e: impl QuotedWithContext<'a, E, Self>,
119    ) -> Stream<T, Self, Unbounded>
120    where
121        Self: Sized + NoTick,
122    {
123        let e = e.splice_untyped_ctx(self);
124
125        Stream::new(
126            self.clone(),
127            HydroNode::Persist {
128                inner: Box::new(HydroNode::Source {
129                    source: HydroSource::Stream(e.into()),
130                    location_kind: self.id(),
131                    metadata: self.new_node_metadata::<T>(),
132                }),
133                metadata: self.new_node_metadata::<T>(),
134            },
135        )
136    }
137
138    fn source_iter<T, E: IntoIterator<Item = T>>(
139        &self,
140        e: impl QuotedWithContext<'a, E, Self>,
141    ) -> Stream<T, Self, Unbounded>
142    where
143        Self: Sized + NoTick,
144    {
145        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
146        // for bounded top-level streams, and this is the only way to generate one
147        let e = e.splice_untyped_ctx(self);
148
149        Stream::new(
150            self.clone(),
151            HydroNode::Persist {
152                inner: Box::new(HydroNode::Source {
153                    source: HydroSource::Iter(e.into()),
154                    location_kind: self.id(),
155                    metadata: self.new_node_metadata::<T>(),
156                }),
157                metadata: self.new_node_metadata::<T>(),
158            },
159        )
160    }
161
162    fn singleton<T: Clone>(
163        &self,
164        e: impl QuotedWithContext<'a, T, Self>,
165    ) -> Singleton<T, Self, Unbounded>
166    where
167        Self: Sized + NoTick,
168    {
169        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
170        // for bounded top-level singletons, and this is the only way to generate one
171
172        let e_arr = q!([e]);
173        let e = e_arr.splice_untyped_ctx(self);
174
175        // we do a double persist here because if the singleton shows up on every tick,
176        // we first persist the source so that we store that value and then persist again
177        // so that it grows every tick
178        Singleton::new(
179            self.clone(),
180            HydroNode::Persist {
181                inner: Box::new(HydroNode::Persist {
182                    inner: Box::new(HydroNode::Source {
183                        source: HydroSource::Iter(e.into()),
184                        location_kind: self.id(),
185                        metadata: self.new_node_metadata::<T>(),
186                    }),
187                    metadata: self.new_node_metadata::<T>(),
188                }),
189                metadata: self.new_node_metadata::<T>(),
190            },
191        )
192    }
193
194    /// Generates a stream with values emitted at a fixed interval, with
195    /// each value being the current time (as an [`tokio::time::Instant`]).
196    ///
197    /// The clock source used is monotonic, so elements will be emitted in
198    /// increasing order.
199    ///
200    /// # Safety
201    /// Because this stream is generated by an OS timer, it will be
202    /// non-deterministic because each timestamp will be arbitrary.
203    unsafe fn source_interval(
204        &self,
205        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
206    ) -> Stream<tokio::time::Instant, Self, Unbounded>
207    where
208        Self: Sized + NoTick,
209    {
210        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
211            tokio::time::interval(interval)
212        )))
213    }
214
215    /// Generates a stream with values emitted at a fixed interval (with an
216    /// initial delay), with each value being the current time
217    /// (as an [`tokio::time::Instant`]).
218    ///
219    /// The clock source used is monotonic, so elements will be emitted in
220    /// increasing order.
221    ///
222    /// # Safety
223    /// Because this stream is generated by an OS timer, it will be
224    /// non-deterministic because each timestamp will be arbitrary.
225    unsafe fn source_interval_delayed(
226        &self,
227        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
228        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229    ) -> Stream<tokio::time::Instant, Self, Unbounded>
230    where
231        Self: Sized + NoTick,
232    {
233        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
234            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
235        )))
236    }
237
238    fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
239        &self,
240    ) -> (ForwardRef<'a, S>, S)
241    where
242        Self: NoTick,
243    {
244        let next_id = {
245            let on_id = match self.id() {
246                LocationId::Process(id) => id,
247                LocationId::Cluster(id) => id,
248                LocationId::Tick(_, _) => panic!(),
249                LocationId::ExternalProcess(_) => panic!(),
250            };
251
252            let mut flow_state = self.flow_state().borrow_mut();
253            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
254
255            let id = *next_id_entry;
256            *next_id_entry += 1;
257            id
258        };
259
260        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
261
262        (
263            ForwardRef {
264                completed: false,
265                ident: ident.clone(),
266                expected_location: self.id(),
267                _phantom: PhantomData,
268            },
269            S::create_source(ident, self.clone()),
270        )
271    }
272}