1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::stream::Stream as FuturesStream;
6use proc_macro2::Span;
7use stageleft::{QuotedWithContext, q};
8
9use super::builder::FlowState;
10use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
11use crate::ir::{DebugType, HydroIrMetadata, HydroNode, HydroSource};
12use crate::{Singleton, Stream, Unbounded};
13
14pub mod external_process;
15pub use external_process::ExternalProcess;
16
17pub mod process;
18pub use process::Process;
19
20pub mod cluster;
21pub use cluster::{Cluster, ClusterId};
22
23pub mod can_send;
24pub use can_send::CanSend;
25
26pub mod tick;
27pub use tick::{Atomic, NoTick, Tick};
28
29#[derive(PartialEq, Eq, Clone, Debug, Hash)]
30pub enum LocationId {
31 Process(usize),
32 Cluster(usize),
33 Tick(usize, Box<LocationId>),
34 ExternalProcess(usize),
35}
36
37impl LocationId {
38 pub fn root(&self) -> &LocationId {
39 match self {
40 LocationId::Process(_) => self,
41 LocationId::Cluster(_) => self,
42 LocationId::Tick(_, id) => id.root(),
43 LocationId::ExternalProcess(_) => self,
44 }
45 }
46
47 pub fn raw_id(&self) -> usize {
48 match self {
49 LocationId::Process(id) => *id,
50 LocationId::Cluster(id) => *id,
51 LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
52 LocationId::ExternalProcess(id) => *id,
53 }
54 }
55}
56
57pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
58 assert_eq!(l1.id(), l2.id(), "locations do not match");
59}
60
61pub trait Location<'a>: Clone {
62 type Root: Location<'a>;
63
64 fn root(&self) -> Self::Root;
65
66 fn id(&self) -> LocationId;
67
68 fn flow_state(&self) -> &FlowState;
69
70 fn is_top_level() -> bool;
71
72 fn tick(&self) -> Tick<Self>
73 where
74 Self: NoTick,
75 {
76 let next_id = self.flow_state().borrow_mut().next_clock_id;
77 self.flow_state().borrow_mut().next_clock_id += 1;
78 Tick {
79 id: next_id,
80 l: self.clone(),
81 }
82 }
83
84 fn next_node_id(&self) -> usize {
85 let next_id = self.flow_state().borrow_mut().next_node_id;
86 self.flow_state().borrow_mut().next_node_id += 1;
87 next_id
88 }
89
90 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
91 HydroIrMetadata {
92 location_kind: self.id(),
93 output_type: Some(DebugType(stageleft::quote_type::<T>())),
94 cardinality: None,
95 cpu_usage: None,
96 }
97 }
98
99 fn spin(&self) -> Stream<(), Self, Unbounded>
100 where
101 Self: Sized + NoTick,
102 {
103 Stream::new(
104 self.clone(),
105 HydroNode::Persist {
106 inner: Box::new(HydroNode::Source {
107 source: HydroSource::Spin(),
108 location_kind: self.id(),
109 metadata: self.new_node_metadata::<()>(),
110 }),
111 metadata: self.new_node_metadata::<()>(),
112 },
113 )
114 }
115
116 fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
117 &self,
118 e: impl QuotedWithContext<'a, E, Self>,
119 ) -> Stream<T, Self, Unbounded>
120 where
121 Self: Sized + NoTick,
122 {
123 let e = e.splice_untyped_ctx(self);
124
125 Stream::new(
126 self.clone(),
127 HydroNode::Persist {
128 inner: Box::new(HydroNode::Source {
129 source: HydroSource::Stream(e.into()),
130 location_kind: self.id(),
131 metadata: self.new_node_metadata::<T>(),
132 }),
133 metadata: self.new_node_metadata::<T>(),
134 },
135 )
136 }
137
138 fn source_iter<T, E: IntoIterator<Item = T>>(
139 &self,
140 e: impl QuotedWithContext<'a, E, Self>,
141 ) -> Stream<T, Self, Unbounded>
142 where
143 Self: Sized + NoTick,
144 {
145 let e = e.splice_untyped_ctx(self);
148
149 Stream::new(
150 self.clone(),
151 HydroNode::Persist {
152 inner: Box::new(HydroNode::Source {
153 source: HydroSource::Iter(e.into()),
154 location_kind: self.id(),
155 metadata: self.new_node_metadata::<T>(),
156 }),
157 metadata: self.new_node_metadata::<T>(),
158 },
159 )
160 }
161
162 fn singleton<T: Clone>(
163 &self,
164 e: impl QuotedWithContext<'a, T, Self>,
165 ) -> Singleton<T, Self, Unbounded>
166 where
167 Self: Sized + NoTick,
168 {
169 let e_arr = q!([e]);
173 let e = e_arr.splice_untyped_ctx(self);
174
175 Singleton::new(
179 self.clone(),
180 HydroNode::Persist {
181 inner: Box::new(HydroNode::Persist {
182 inner: Box::new(HydroNode::Source {
183 source: HydroSource::Iter(e.into()),
184 location_kind: self.id(),
185 metadata: self.new_node_metadata::<T>(),
186 }),
187 metadata: self.new_node_metadata::<T>(),
188 }),
189 metadata: self.new_node_metadata::<T>(),
190 },
191 )
192 }
193
194 unsafe fn source_interval(
204 &self,
205 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
206 ) -> Stream<tokio::time::Instant, Self, Unbounded>
207 where
208 Self: Sized + NoTick,
209 {
210 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
211 tokio::time::interval(interval)
212 )))
213 }
214
215 unsafe fn source_interval_delayed(
226 &self,
227 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
228 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229 ) -> Stream<tokio::time::Instant, Self, Unbounded>
230 where
231 Self: Sized + NoTick,
232 {
233 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
234 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
235 )))
236 }
237
238 fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
239 &self,
240 ) -> (ForwardRef<'a, S>, S)
241 where
242 Self: NoTick,
243 {
244 let next_id = {
245 let on_id = match self.id() {
246 LocationId::Process(id) => id,
247 LocationId::Cluster(id) => id,
248 LocationId::Tick(_, _) => panic!(),
249 LocationId::ExternalProcess(_) => panic!(),
250 };
251
252 let mut flow_state = self.flow_state().borrow_mut();
253 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
254
255 let id = *next_id_entry;
256 *next_id_entry += 1;
257 id
258 };
259
260 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
261
262 (
263 ForwardRef {
264 completed: false,
265 ident: ident.clone(),
266 expected_location: self.id(),
267 _phantom: PhantomData,
268 },
269 S::create_source(ident, self.clone()),
270 )
271 }
272}