1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26#[derive(Clone, Hash)]
27pub struct DebugExpr(pub syn::Expr);
28
29impl From<syn::Expr> for DebugExpr {
30 fn from(expr: syn::Expr) -> DebugExpr {
31 DebugExpr(expr)
32 }
33}
34
35impl Deref for DebugExpr {
36 type Target = syn::Expr;
37
38 fn deref(&self) -> &Self::Target {
39 &self.0
40 }
41}
42
43impl ToTokens for DebugExpr {
44 fn to_tokens(&self, tokens: &mut TokenStream) {
45 self.0.to_tokens(tokens);
46 }
47}
48
49impl Debug for DebugExpr {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "{}", self.0.to_token_stream())
52 }
53}
54
55#[derive(Clone, Hash)]
56pub struct DebugType(pub syn::Type);
57
58impl From<syn::Type> for DebugType {
59 fn from(t: syn::Type) -> DebugType {
60 DebugType(t)
61 }
62}
63
64impl Deref for DebugType {
65 type Target = syn::Type;
66
67 fn deref(&self) -> &Self::Target {
68 &self.0
69 }
70}
71
72impl ToTokens for DebugType {
73 fn to_tokens(&self, tokens: &mut TokenStream) {
74 self.0.to_tokens(tokens);
75 }
76}
77
78impl Debug for DebugType {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0.to_token_stream())
81 }
82}
83
84#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
85#[allow(
86 clippy::large_enum_variant,
87 reason = "`Building` is just equivalent to `None`."
88)]
89pub enum DebugInstantiate {
90 Building,
91 Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
92}
93
94impl Debug for DebugInstantiate {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 write!(f, "<network instantiate>")
97 }
98}
99
100impl Hash for DebugInstantiate {
101 fn hash<H: Hasher>(&self, _state: &mut H) {
102 }
104}
105
106impl Clone for DebugInstantiate {
107 fn clone(&self) -> Self {
108 match self {
109 DebugInstantiate::Building => DebugInstantiate::Building,
110 DebugInstantiate::Finalized(_, _, _) => {
111 panic!("DebugInstantiate::Finalized should not be cloned")
112 }
113 }
114 }
115}
116
117#[derive(Debug, Hash, Clone)]
119pub enum HydroSource {
120 Stream(DebugExpr),
121 ExternalNetwork(),
122 Iter(DebugExpr),
123 Spin(),
124}
125
126#[cfg(feature = "build")]
127pub enum BuildersOrCallback<
128 'a,
129 L: FnMut(&mut HydroLeaf, &mut usize),
130 N: FnMut(&mut HydroNode, &mut usize),
131> {
132 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
133 Callback(L, N),
134}
135
136#[derive(Debug, Hash)]
140pub enum HydroLeaf {
141 ForEach {
142 f: DebugExpr,
143 input: Box<HydroNode>,
144 metadata: HydroIrMetadata,
145 },
146 DestSink {
147 sink: DebugExpr,
148 input: Box<HydroNode>,
149 metadata: HydroIrMetadata,
150 },
151 CycleSink {
152 ident: syn::Ident,
153 location_kind: LocationId,
154 input: Box<HydroNode>,
155 metadata: HydroIrMetadata,
156 },
157}
158
159impl HydroLeaf {
160 #[cfg(feature = "build")]
161 pub fn compile_network<'a, D: Deploy<'a>>(
162 &mut self,
163 compile_env: &D::CompileEnv,
164 seen_tees: &mut SeenTees,
165 seen_tee_locations: &mut SeenTeeLocations,
166 processes: &HashMap<usize, D::Process>,
167 clusters: &HashMap<usize, D::Cluster>,
168 externals: &HashMap<usize, D::ExternalProcess>,
169 ) {
170 self.transform_children(
171 |n, s| {
172 n.compile_network::<D>(
173 compile_env,
174 s,
175 seen_tee_locations,
176 processes,
177 clusters,
178 externals,
179 );
180 },
181 seen_tees,
182 )
183 }
184
185 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
186 self.transform_children(
187 |n, s| {
188 n.connect_network(s);
189 },
190 seen_tees,
191 )
192 }
193
194 pub fn transform_bottom_up(
195 &mut self,
196 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
197 transform_node: &mut impl FnMut(&mut HydroNode),
198 seen_tees: &mut SeenTees,
199 ) {
200 self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
201
202 transform_leaf(self);
203 }
204
205 pub fn transform_children(
206 &mut self,
207 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
208 seen_tees: &mut SeenTees,
209 ) {
210 match self {
211 HydroLeaf::ForEach { f: _, input, .. }
212 | HydroLeaf::DestSink { sink: _, input, .. }
213 | HydroLeaf::CycleSink {
214 ident: _,
215 location_kind: _,
216 input,
217 ..
218 } => {
219 transform(input, seen_tees);
220 }
221 }
222 }
223
224 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
225 match self {
226 HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
227 f: f.clone(),
228 input: Box::new(input.deep_clone(seen_tees)),
229 metadata: metadata.clone(),
230 },
231 HydroLeaf::DestSink {
232 sink,
233 input,
234 metadata,
235 } => HydroLeaf::DestSink {
236 sink: sink.clone(),
237 input: Box::new(input.deep_clone(seen_tees)),
238 metadata: metadata.clone(),
239 },
240 HydroLeaf::CycleSink {
241 ident,
242 location_kind,
243 input,
244 metadata,
245 } => HydroLeaf::CycleSink {
246 ident: ident.clone(),
247 location_kind: location_kind.clone(),
248 input: Box::new(input.deep_clone(seen_tees)),
249 metadata: metadata.clone(),
250 },
251 }
252 }
253
254 #[cfg(feature = "build")]
255 pub fn emit(
256 &mut self,
257 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
258 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
259 next_stmt_id: &mut usize,
260 ) {
261 self.emit_core(
262 &mut BuildersOrCallback::Builders::<
263 fn(&mut HydroLeaf, &mut usize),
264 fn(&mut HydroNode, &mut usize),
265 >(graph_builders),
266 built_tees,
267 next_stmt_id,
268 );
269 }
270
271 #[cfg(feature = "build")]
272 pub fn emit_core(
273 &mut self,
274 builders_or_callback: &mut BuildersOrCallback<
275 impl FnMut(&mut HydroLeaf, &mut usize),
276 impl FnMut(&mut HydroNode, &mut usize),
277 >,
278 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
279 next_stmt_id: &mut usize,
280 ) {
281 match self {
282 HydroLeaf::ForEach { f, input, .. } => {
283 let (input_ident, input_location_id) =
284 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
285
286 match builders_or_callback {
287 BuildersOrCallback::Builders(graph_builders) => {
288 graph_builders
289 .entry(input_location_id)
290 .or_default()
291 .add_dfir(
292 parse_quote! {
293 #input_ident -> for_each(#f);
294 },
295 None,
296 Some(&next_stmt_id.to_string()),
297 );
298 }
299 BuildersOrCallback::Callback(leaf_callback, _) => {
300 leaf_callback(self, next_stmt_id);
301 }
302 }
303
304 *next_stmt_id += 1;
305 }
306
307 HydroLeaf::DestSink { sink, input, .. } => {
308 let (input_ident, input_location_id) =
309 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
310
311 match builders_or_callback {
312 BuildersOrCallback::Builders(graph_builders) => {
313 graph_builders
314 .entry(input_location_id)
315 .or_default()
316 .add_dfir(
317 parse_quote! {
318 #input_ident -> dest_sink(#sink);
319 },
320 None,
321 Some(&next_stmt_id.to_string()),
322 );
323 }
324 BuildersOrCallback::Callback(leaf_callback, _) => {
325 leaf_callback(self, next_stmt_id);
326 }
327 }
328
329 *next_stmt_id += 1;
330 }
331
332 HydroLeaf::CycleSink {
333 ident,
334 location_kind,
335 input,
336 ..
337 } => {
338 let (input_ident, input_location_id) =
339 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
340
341 let location_id = match location_kind.root() {
342 LocationId::Process(id) => id,
343 LocationId::Cluster(id) => id,
344 LocationId::Tick(_, _) => panic!(),
345 LocationId::ExternalProcess(_) => panic!(),
346 };
347
348 assert_eq!(
349 input_location_id, *location_id,
350 "cycle_sink location mismatch"
351 );
352
353 match builders_or_callback {
354 BuildersOrCallback::Builders(graph_builders) => {
355 graph_builders.entry(*location_id).or_default().add_dfir(
356 parse_quote! {
357 #ident = #input_ident;
358 },
359 None,
360 None,
361 );
362 }
363 BuildersOrCallback::Callback(_, _) => {}
365 }
366 }
367 }
368 }
369
370 pub fn metadata(&self) -> &HydroIrMetadata {
371 match self {
372 HydroLeaf::ForEach { metadata, .. }
373 | HydroLeaf::DestSink { metadata, .. }
374 | HydroLeaf::CycleSink { metadata, .. } => metadata,
375 }
376 }
377
378 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
379 match self {
380 HydroLeaf::ForEach { metadata, .. }
381 | HydroLeaf::DestSink { metadata, .. }
382 | HydroLeaf::CycleSink { metadata, .. } => metadata,
383 }
384 }
385
386 pub fn print_root(&self) -> String {
387 match self {
388 HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
389 HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
390 HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
391 }
392 }
393}
394
395#[cfg(feature = "build")]
396pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
397 let mut builders = BTreeMap::new();
398 let mut built_tees = HashMap::new();
399 let mut next_stmt_id = 0;
400 for leaf in ir {
401 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
402 }
403 builders
404}
405
406#[cfg(feature = "build")]
407pub fn traverse_dfir(
408 ir: &mut [HydroLeaf],
409 transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
410 transform_node: impl FnMut(&mut HydroNode, &mut usize),
411) {
412 let mut seen_tees = HashMap::new();
413 let mut next_stmt_id = 0;
414 let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
415 ir.iter_mut().for_each(|leaf| {
416 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
417 });
418}
419
420pub fn transform_bottom_up(
421 ir: &mut [HydroLeaf],
422 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
423 transform_node: &mut impl FnMut(&mut HydroNode),
424) {
425 let mut seen_tees = HashMap::new();
426 ir.iter_mut().for_each(|leaf| {
427 leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
428 });
429}
430
431pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
432 let mut seen_tees = HashMap::new();
433 ir.iter()
434 .map(|leaf| leaf.deep_clone(&mut seen_tees))
435 .collect()
436}
437
438type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
439thread_local! {
440 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
441}
442
443pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
444 PRINTED_TEES.with(|printed_tees| {
445 let mut printed_tees_mut = printed_tees.borrow_mut();
446 *printed_tees_mut = Some((0, HashMap::new()));
447 drop(printed_tees_mut);
448
449 let ret = f();
450
451 let mut printed_tees_mut = printed_tees.borrow_mut();
452 *printed_tees_mut = None;
453
454 ret
455 })
456}
457
458pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
459
460impl Debug for TeeNode {
461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 PRINTED_TEES.with(|printed_tees| {
463 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
464 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
465
466 if let Some(printed_tees_mut) = printed_tees_mut {
467 if let Some(existing) = printed_tees_mut
468 .1
469 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
470 {
471 write!(f, "<tee {}>", existing)
472 } else {
473 let next_id = printed_tees_mut.0;
474 printed_tees_mut.0 += 1;
475 printed_tees_mut
476 .1
477 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
478 drop(printed_tees_mut_borrow);
479 write!(f, "<tee {}>: ", next_id)?;
480 Debug::fmt(&self.0.borrow(), f)
481 }
482 } else {
483 drop(printed_tees_mut_borrow);
484 write!(f, "<tee>: ")?;
485 Debug::fmt(&self.0.borrow(), f)
486 }
487 })
488 }
489}
490
491impl Hash for TeeNode {
492 fn hash<H: Hasher>(&self, state: &mut H) {
493 self.0.borrow_mut().hash(state);
494 }
495}
496
497#[derive(Debug, Clone)]
498pub struct HydroIrMetadata {
499 pub location_kind: LocationId,
500 pub output_type: Option<DebugType>,
501 pub cardinality: Option<usize>,
502 pub cpu_usage: Option<f64>,
503}
504
505impl Hash for HydroIrMetadata {
507 fn hash<H: Hasher>(&self, _: &mut H) {}
508}
509
510impl PartialEq for HydroIrMetadata {
511 fn eq(&self, _: &Self) -> bool {
512 true
513 }
514}
515
516impl Eq for HydroIrMetadata {}
517
518#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
521#[allow(clippy::large_enum_variant, reason = "TODO(mingwei):")]
522#[derive(Debug, Hash)]
523pub enum HydroNode {
524 Placeholder,
525
526 Source {
527 source: HydroSource,
528 location_kind: LocationId,
529 metadata: HydroIrMetadata,
530 },
531
532 CycleSource {
533 ident: syn::Ident,
534 location_kind: LocationId,
535 metadata: HydroIrMetadata,
536 },
537
538 Tee {
539 inner: TeeNode,
540 metadata: HydroIrMetadata,
541 },
542
543 Persist {
544 inner: Box<HydroNode>,
545 metadata: HydroIrMetadata,
546 },
547
548 Unpersist {
549 inner: Box<HydroNode>,
550 metadata: HydroIrMetadata,
551 },
552
553 Delta {
554 inner: Box<HydroNode>,
555 metadata: HydroIrMetadata,
556 },
557
558 Chain {
559 first: Box<HydroNode>,
560 second: Box<HydroNode>,
561 metadata: HydroIrMetadata,
562 },
563
564 CrossProduct {
565 left: Box<HydroNode>,
566 right: Box<HydroNode>,
567 metadata: HydroIrMetadata,
568 },
569
570 CrossSingleton {
571 left: Box<HydroNode>,
572 right: Box<HydroNode>,
573 metadata: HydroIrMetadata,
574 },
575
576 Join {
577 left: Box<HydroNode>,
578 right: Box<HydroNode>,
579 metadata: HydroIrMetadata,
580 },
581
582 Difference {
583 pos: Box<HydroNode>,
584 neg: Box<HydroNode>,
585 metadata: HydroIrMetadata,
586 },
587
588 AntiJoin {
589 pos: Box<HydroNode>,
590 neg: Box<HydroNode>,
591 metadata: HydroIrMetadata,
592 },
593
594 ResolveFutures {
595 input: Box<HydroNode>,
596 metadata: HydroIrMetadata,
597 },
598 ResolveFuturesOrdered {
599 input: Box<HydroNode>,
600 metadata: HydroIrMetadata,
601 },
602
603 Map {
604 f: DebugExpr,
605 input: Box<HydroNode>,
606 metadata: HydroIrMetadata,
607 },
608 FlatMap {
609 f: DebugExpr,
610 input: Box<HydroNode>,
611 metadata: HydroIrMetadata,
612 },
613 Filter {
614 f: DebugExpr,
615 input: Box<HydroNode>,
616 metadata: HydroIrMetadata,
617 },
618 FilterMap {
619 f: DebugExpr,
620 input: Box<HydroNode>,
621 metadata: HydroIrMetadata,
622 },
623
624 DeferTick {
625 input: Box<HydroNode>,
626 metadata: HydroIrMetadata,
627 },
628 Enumerate {
629 is_static: bool,
630 input: Box<HydroNode>,
631 metadata: HydroIrMetadata,
632 },
633 Inspect {
634 f: DebugExpr,
635 input: Box<HydroNode>,
636 metadata: HydroIrMetadata,
637 },
638
639 Unique {
640 input: Box<HydroNode>,
641 metadata: HydroIrMetadata,
642 },
643
644 Sort {
645 input: Box<HydroNode>,
646 metadata: HydroIrMetadata,
647 },
648 Fold {
649 init: DebugExpr,
650 acc: DebugExpr,
651 input: Box<HydroNode>,
652 metadata: HydroIrMetadata,
653 },
654 FoldKeyed {
655 init: DebugExpr,
656 acc: DebugExpr,
657 input: Box<HydroNode>,
658 metadata: HydroIrMetadata,
659 },
660
661 Reduce {
662 f: DebugExpr,
663 input: Box<HydroNode>,
664 metadata: HydroIrMetadata,
665 },
666 ReduceKeyed {
667 f: DebugExpr,
668 input: Box<HydroNode>,
669 metadata: HydroIrMetadata,
670 },
671
672 Network {
673 from_key: Option<usize>,
674 to_location: LocationId,
675 to_key: Option<usize>,
676 serialize_fn: Option<DebugExpr>,
677 instantiate_fn: DebugInstantiate,
678 deserialize_fn: Option<DebugExpr>,
679 input: Box<HydroNode>,
680 metadata: HydroIrMetadata,
681 },
682
683 Counter {
684 tag: String,
685 duration: DebugExpr,
686 input: Box<HydroNode>,
687 metadata: HydroIrMetadata,
688 },
689}
690
691pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
692pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
693
694impl<'a> HydroNode {
695 #[cfg(feature = "build")]
696 pub fn compile_network<D: Deploy<'a>>(
697 &mut self,
698 compile_env: &D::CompileEnv,
699 seen_tees: &mut SeenTees,
700 seen_tee_locations: &mut SeenTeeLocations,
701 nodes: &HashMap<usize, D::Process>,
702 clusters: &HashMap<usize, D::Cluster>,
703 externals: &HashMap<usize, D::ExternalProcess>,
704 ) {
705 let mut curr_location = None;
706
707 self.transform_bottom_up(
708 &mut |n| {
709 if let HydroNode::Network {
710 from_key,
711 to_location,
712 to_key,
713 instantiate_fn,
714 ..
715 } = n
716 {
717 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
718 DebugInstantiate::Building => instantiate_network::<D>(
719 curr_location.as_ref().unwrap(),
720 *from_key,
721 to_location,
722 *to_key,
723 nodes,
724 clusters,
725 externals,
726 compile_env,
727 ),
728
729 DebugInstantiate::Finalized(_, _, _) => panic!("network already finalized"),
730 };
731
732 *instantiate_fn =
733 DebugInstantiate::Finalized(sink_expr, source_expr, Some(connect_fn));
734 }
735
736 match n {
738 HydroNode::Network {
739 to_location: location_kind,
740 ..
741 }
742 | HydroNode::CycleSource { location_kind, .. }
743 | HydroNode::Source { location_kind, .. } => {
744 if let LocationId::Tick(_, tick_loc) = location_kind {
746 curr_location = Some(*tick_loc.clone());
747 } else {
748 curr_location = Some(location_kind.clone());
749 }
750 }
751 HydroNode::Tee { inner, .. } => {
752 let inner_ref = inner.0.as_ref() as *const RefCell<HydroNode>;
753 if let Some(tee_location) = seen_tee_locations.get(&inner_ref) {
754 curr_location = Some(tee_location.clone());
755 } else {
756 seen_tee_locations
757 .insert(inner_ref, curr_location.as_ref().unwrap().clone());
758 }
759 }
760 _ => {}
761 }
762 },
763 seen_tees,
764 );
765 }
766
767 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
768 self.transform_bottom_up(
769 &mut |n| {
770 if let HydroNode::Network { instantiate_fn, .. } = n {
771 match instantiate_fn {
772 DebugInstantiate::Building => panic!("network not built"),
773
774 DebugInstantiate::Finalized(_, _, connect_fn) => {
775 (connect_fn.take().unwrap())();
776 }
777 }
778 }
779 },
780 seen_tees,
781 );
782 }
783
784 pub fn transform_bottom_up(
785 &mut self,
786 transform: &mut impl FnMut(&mut HydroNode),
787 seen_tees: &mut SeenTees,
788 ) {
789 self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
790
791 transform(self);
792 }
793
794 #[inline(always)]
795 pub fn transform_children(
796 &mut self,
797 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
798 seen_tees: &mut SeenTees,
799 ) {
800 match self {
801 HydroNode::Placeholder => {
802 panic!();
803 }
804
805 HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
806
807 HydroNode::Tee { inner, .. } => {
808 if let Some(transformed) =
809 seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
810 {
811 *inner = TeeNode(transformed.clone());
812 } else {
813 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
814 seen_tees.insert(
815 inner.0.as_ref() as *const RefCell<HydroNode>,
816 transformed_cell.clone(),
817 );
818 let mut orig = inner.0.replace(HydroNode::Placeholder);
819 transform(&mut orig, seen_tees);
820 *transformed_cell.borrow_mut() = orig;
821 *inner = TeeNode(transformed_cell);
822 }
823 }
824
825 HydroNode::Persist { inner, .. }
826 | HydroNode::Unpersist { inner, .. }
827 | HydroNode::Delta { inner, .. } => {
828 transform(inner.as_mut(), seen_tees);
829 }
830
831 HydroNode::Chain { first, second, .. } => {
832 transform(first.as_mut(), seen_tees);
833 transform(second.as_mut(), seen_tees);
834 }
835
836 HydroNode::CrossSingleton { left, right, .. }
837 | HydroNode::CrossProduct { left, right, .. }
838 | HydroNode::Join { left, right, .. } => {
839 transform(left.as_mut(), seen_tees);
840 transform(right.as_mut(), seen_tees);
841 }
842
843 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
844 transform(pos.as_mut(), seen_tees);
845 transform(neg.as_mut(), seen_tees);
846 }
847
848 HydroNode::Map { input, .. }
849 | HydroNode::ResolveFutures { input, .. }
850 | HydroNode::ResolveFuturesOrdered { input, .. }
851 | HydroNode::FlatMap { input, .. }
852 | HydroNode::Filter { input, .. }
853 | HydroNode::FilterMap { input, .. }
854 | HydroNode::Sort { input, .. }
855 | HydroNode::DeferTick { input, .. }
856 | HydroNode::Enumerate { input, .. }
857 | HydroNode::Inspect { input, .. }
858 | HydroNode::Unique { input, .. }
859 | HydroNode::Network { input, .. }
860 | HydroNode::Fold { input, .. }
861 | HydroNode::FoldKeyed { input, .. }
862 | HydroNode::Reduce { input, .. }
863 | HydroNode::ReduceKeyed { input, .. }
864 | HydroNode::Counter { input, .. } => {
865 transform(input.as_mut(), seen_tees);
866 }
867 }
868 }
869
870 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
871 match self {
872 HydroNode::Placeholder => HydroNode::Placeholder,
873 HydroNode::Source {
874 source,
875 location_kind,
876 metadata,
877 } => HydroNode::Source {
878 source: source.clone(),
879 location_kind: location_kind.clone(),
880 metadata: metadata.clone(),
881 },
882 HydroNode::CycleSource {
883 ident,
884 location_kind,
885 metadata,
886 } => HydroNode::CycleSource {
887 ident: ident.clone(),
888 location_kind: location_kind.clone(),
889 metadata: metadata.clone(),
890 },
891 HydroNode::Tee { inner, metadata } => {
892 if let Some(transformed) =
893 seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
894 {
895 HydroNode::Tee {
896 inner: TeeNode(transformed.clone()),
897 metadata: metadata.clone(),
898 }
899 } else {
900 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
901 seen_tees.insert(
902 inner.0.as_ref() as *const RefCell<HydroNode>,
903 new_rc.clone(),
904 );
905 let cloned = inner.0.borrow().deep_clone(seen_tees);
906 *new_rc.borrow_mut() = cloned;
907 HydroNode::Tee {
908 inner: TeeNode(new_rc),
909 metadata: metadata.clone(),
910 }
911 }
912 }
913 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
914 inner: Box::new(inner.deep_clone(seen_tees)),
915 metadata: metadata.clone(),
916 },
917 HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
918 inner: Box::new(inner.deep_clone(seen_tees)),
919 metadata: metadata.clone(),
920 },
921 HydroNode::Delta { inner, metadata } => HydroNode::Delta {
922 inner: Box::new(inner.deep_clone(seen_tees)),
923 metadata: metadata.clone(),
924 },
925 HydroNode::Chain {
926 first,
927 second,
928 metadata,
929 } => HydroNode::Chain {
930 first: Box::new(first.deep_clone(seen_tees)),
931 second: Box::new(second.deep_clone(seen_tees)),
932 metadata: metadata.clone(),
933 },
934 HydroNode::CrossProduct {
935 left,
936 right,
937 metadata,
938 } => HydroNode::CrossProduct {
939 left: Box::new(left.deep_clone(seen_tees)),
940 right: Box::new(right.deep_clone(seen_tees)),
941 metadata: metadata.clone(),
942 },
943 HydroNode::CrossSingleton {
944 left,
945 right,
946 metadata,
947 } => HydroNode::CrossSingleton {
948 left: Box::new(left.deep_clone(seen_tees)),
949 right: Box::new(right.deep_clone(seen_tees)),
950 metadata: metadata.clone(),
951 },
952 HydroNode::Join {
953 left,
954 right,
955 metadata,
956 } => HydroNode::Join {
957 left: Box::new(left.deep_clone(seen_tees)),
958 right: Box::new(right.deep_clone(seen_tees)),
959 metadata: metadata.clone(),
960 },
961 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
962 pos: Box::new(pos.deep_clone(seen_tees)),
963 neg: Box::new(neg.deep_clone(seen_tees)),
964 metadata: metadata.clone(),
965 },
966 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
967 pos: Box::new(pos.deep_clone(seen_tees)),
968 neg: Box::new(neg.deep_clone(seen_tees)),
969 metadata: metadata.clone(),
970 },
971 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
972 input: Box::new(input.deep_clone(seen_tees)),
973 metadata: metadata.clone(),
974 },
975 HydroNode::ResolveFuturesOrdered { input, metadata } => {
976 HydroNode::ResolveFuturesOrdered {
977 input: Box::new(input.deep_clone(seen_tees)),
978 metadata: metadata.clone(),
979 }
980 }
981 HydroNode::Map { f, input, metadata } => HydroNode::Map {
982 f: f.clone(),
983 input: Box::new(input.deep_clone(seen_tees)),
984 metadata: metadata.clone(),
985 },
986 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
987 f: f.clone(),
988 input: Box::new(input.deep_clone(seen_tees)),
989 metadata: metadata.clone(),
990 },
991 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
992 f: f.clone(),
993 input: Box::new(input.deep_clone(seen_tees)),
994 metadata: metadata.clone(),
995 },
996 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
997 f: f.clone(),
998 input: Box::new(input.deep_clone(seen_tees)),
999 metadata: metadata.clone(),
1000 },
1001 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1002 input: Box::new(input.deep_clone(seen_tees)),
1003 metadata: metadata.clone(),
1004 },
1005 HydroNode::Enumerate {
1006 is_static,
1007 input,
1008 metadata,
1009 } => HydroNode::Enumerate {
1010 is_static: *is_static,
1011 input: Box::new(input.deep_clone(seen_tees)),
1012 metadata: metadata.clone(),
1013 },
1014 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1015 f: f.clone(),
1016 input: Box::new(input.deep_clone(seen_tees)),
1017 metadata: metadata.clone(),
1018 },
1019 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1020 input: Box::new(input.deep_clone(seen_tees)),
1021 metadata: metadata.clone(),
1022 },
1023 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1024 input: Box::new(input.deep_clone(seen_tees)),
1025 metadata: metadata.clone(),
1026 },
1027 HydroNode::Fold {
1028 init,
1029 acc,
1030 input,
1031 metadata,
1032 } => HydroNode::Fold {
1033 init: init.clone(),
1034 acc: acc.clone(),
1035 input: Box::new(input.deep_clone(seen_tees)),
1036 metadata: metadata.clone(),
1037 },
1038 HydroNode::FoldKeyed {
1039 init,
1040 acc,
1041 input,
1042 metadata,
1043 } => HydroNode::FoldKeyed {
1044 init: init.clone(),
1045 acc: acc.clone(),
1046 input: Box::new(input.deep_clone(seen_tees)),
1047 metadata: metadata.clone(),
1048 },
1049 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1050 f: f.clone(),
1051 input: Box::new(input.deep_clone(seen_tees)),
1052 metadata: metadata.clone(),
1053 },
1054 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1055 f: f.clone(),
1056 input: Box::new(input.deep_clone(seen_tees)),
1057 metadata: metadata.clone(),
1058 },
1059 HydroNode::Network {
1060 from_key,
1061 to_location,
1062 to_key,
1063 serialize_fn,
1064 instantiate_fn,
1065 deserialize_fn,
1066 input,
1067 metadata,
1068 } => HydroNode::Network {
1069 from_key: *from_key,
1070 to_location: to_location.clone(),
1071 to_key: *to_key,
1072 serialize_fn: serialize_fn.clone(),
1073 instantiate_fn: instantiate_fn.clone(),
1074 deserialize_fn: deserialize_fn.clone(),
1075 input: Box::new(input.deep_clone(seen_tees)),
1076 metadata: metadata.clone(),
1077 },
1078 HydroNode::Counter {
1079 tag,
1080 duration,
1081 input,
1082 metadata,
1083 } => HydroNode::Counter {
1084 tag: tag.clone(),
1085 duration: duration.clone(),
1086 input: Box::new(input.deep_clone(seen_tees)),
1087 metadata: metadata.clone(),
1088 },
1089 }
1090 }
1091
1092 #[cfg(feature = "build")]
1093 pub fn emit_core(
1094 &mut self,
1095 builders_or_callback: &mut BuildersOrCallback<
1096 impl FnMut(&mut HydroLeaf, &mut usize),
1097 impl FnMut(&mut HydroNode, &mut usize),
1098 >,
1099 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1100 next_stmt_id: &mut usize,
1101 ) -> (syn::Ident, usize) {
1102 match self {
1103 HydroNode::Placeholder => {
1104 panic!()
1105 }
1106
1107 HydroNode::Persist { inner, .. } => {
1108 let (inner_ident, location) =
1109 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1110
1111 let persist_ident =
1112 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1113
1114 match builders_or_callback {
1115 BuildersOrCallback::Builders(graph_builders) => {
1116 let builder = graph_builders.entry(location).or_default();
1117 builder.add_dfir(
1118 parse_quote! {
1119 #persist_ident = #inner_ident -> persist::<'static>();
1120 },
1121 None,
1122 Some(&next_stmt_id.to_string()),
1123 );
1124 }
1125 BuildersOrCallback::Callback(_, node_callback) => {
1126 node_callback(self, next_stmt_id);
1127 }
1128 }
1129
1130 *next_stmt_id += 1;
1131
1132 (persist_ident, location)
1133 }
1134
1135 HydroNode::Unpersist { .. } => {
1136 panic!(
1137 "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1138 )
1139 }
1140
1141 HydroNode::Delta { inner, .. } => {
1142 let (inner_ident, location) =
1143 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1144
1145 let delta_ident =
1146 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1147
1148 match builders_or_callback {
1149 BuildersOrCallback::Builders(graph_builders) => {
1150 let builder = graph_builders.entry(location).or_default();
1151 builder.add_dfir(
1152 parse_quote! {
1153 #delta_ident = #inner_ident -> multiset_delta();
1154 },
1155 None,
1156 Some(&next_stmt_id.to_string()),
1157 );
1158 }
1159 BuildersOrCallback::Callback(_, node_callback) => {
1160 node_callback(self, next_stmt_id);
1161 }
1162 }
1163
1164 *next_stmt_id += 1;
1165
1166 (delta_ident, location)
1167 }
1168
1169 HydroNode::Source {
1170 source,
1171 location_kind,
1172 ..
1173 } => {
1174 let location_id = match location_kind.clone() {
1175 LocationId::Process(id) => id,
1176 LocationId::Cluster(id) => id,
1177 LocationId::Tick(_, _) => panic!(),
1178 LocationId::ExternalProcess(id) => id,
1179 };
1180
1181 if let HydroSource::ExternalNetwork() = source {
1182 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1183 } else {
1184 let source_ident =
1185 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1186
1187 let source_stmt = match source {
1188 HydroSource::Stream(expr) => {
1189 parse_quote! {
1190 #source_ident = source_stream(#expr);
1191 }
1192 }
1193
1194 HydroSource::ExternalNetwork() => {
1195 unreachable!()
1196 }
1197
1198 HydroSource::Iter(expr) => {
1199 parse_quote! {
1200 #source_ident = source_iter(#expr);
1201 }
1202 }
1203
1204 HydroSource::Spin() => {
1205 parse_quote! {
1206 #source_ident = spin();
1207 }
1208 }
1209 };
1210
1211 match builders_or_callback {
1212 BuildersOrCallback::Builders(graph_builders) => {
1213 let builder = graph_builders.entry(location_id).or_default();
1214 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1215 }
1216 BuildersOrCallback::Callback(_, node_callback) => {
1217 node_callback(self, next_stmt_id);
1218 }
1219 }
1220
1221 *next_stmt_id += 1;
1222
1223 (source_ident, location_id)
1224 }
1225 }
1226
1227 HydroNode::CycleSource {
1228 ident,
1229 location_kind,
1230 ..
1231 } => {
1232 let location_id = *match location_kind.root() {
1233 LocationId::Process(id) => id,
1234 LocationId::Cluster(id) => id,
1235 LocationId::Tick(_, _) => panic!(),
1236 LocationId::ExternalProcess(_) => panic!(),
1237 };
1238
1239 let ident = ident.clone();
1240
1241 match builders_or_callback {
1242 BuildersOrCallback::Builders(_) => {}
1243 BuildersOrCallback::Callback(_, node_callback) => {
1244 node_callback(self, next_stmt_id);
1245 }
1246 }
1247
1248 *next_stmt_id += 1;
1250
1251 (ident, location_id)
1252 }
1253
1254 HydroNode::Tee { inner, .. } => {
1255 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1256 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1257 {
1258 match builders_or_callback {
1259 BuildersOrCallback::Builders(_) => {}
1260 BuildersOrCallback::Callback(_, node_callback) => {
1261 node_callback(self, next_stmt_id);
1262 }
1263 }
1264
1265 (teed_from.clone(), *inner_location_id)
1266 } else {
1267 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1268 builders_or_callback,
1269 built_tees,
1270 next_stmt_id,
1271 );
1272
1273 let tee_ident =
1274 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1275
1276 built_tees.insert(
1277 inner.0.as_ref() as *const RefCell<HydroNode>,
1278 (tee_ident.clone(), inner_location_id),
1279 );
1280
1281 match builders_or_callback {
1282 BuildersOrCallback::Builders(graph_builders) => {
1283 let builder = graph_builders.entry(inner_location_id).or_default();
1284 builder.add_dfir(
1285 parse_quote! {
1286 #tee_ident = #inner_ident -> tee();
1287 },
1288 None,
1289 Some(&next_stmt_id.to_string()),
1290 );
1291 }
1292 BuildersOrCallback::Callback(_, node_callback) => {
1293 node_callback(self, next_stmt_id);
1294 }
1295 }
1296
1297 (tee_ident, inner_location_id)
1298 };
1299
1300 *next_stmt_id += 1;
1304 (ret_ident, inner_location_id)
1305 }
1306
1307 HydroNode::Chain { first, second, .. } => {
1308 let (first_ident, first_location_id) =
1309 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1310 let (second_ident, second_location_id) =
1311 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1312
1313 assert_eq!(
1314 first_location_id, second_location_id,
1315 "chain inputs must be in the same location"
1316 );
1317
1318 let chain_ident =
1319 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1320
1321 match builders_or_callback {
1322 BuildersOrCallback::Builders(graph_builders) => {
1323 let builder = graph_builders.entry(first_location_id).or_default();
1324 builder.add_dfir(
1325 parse_quote! {
1326 #chain_ident = chain();
1327 #first_ident -> [0]#chain_ident;
1328 #second_ident -> [1]#chain_ident;
1329 },
1330 None,
1331 Some(&next_stmt_id.to_string()),
1332 );
1333 }
1334 BuildersOrCallback::Callback(_, node_callback) => {
1335 node_callback(self, next_stmt_id);
1336 }
1337 }
1338
1339 *next_stmt_id += 1;
1340
1341 (chain_ident, first_location_id)
1342 }
1343
1344 HydroNode::CrossSingleton { left, right, .. } => {
1345 let (left_ident, left_location_id) =
1346 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1347 let (right_ident, right_location_id) =
1348 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1349
1350 assert_eq!(
1351 left_location_id, right_location_id,
1352 "cross_singleton inputs must be in the same location"
1353 );
1354
1355 let cross_ident =
1356 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1357
1358 match builders_or_callback {
1359 BuildersOrCallback::Builders(graph_builders) => {
1360 let builder = graph_builders.entry(left_location_id).or_default();
1361 builder.add_dfir(
1362 parse_quote! {
1363 #cross_ident = cross_singleton();
1364 #left_ident -> [input]#cross_ident;
1365 #right_ident -> [single]#cross_ident;
1366 },
1367 None,
1368 Some(&next_stmt_id.to_string()),
1369 );
1370 }
1371 BuildersOrCallback::Callback(_, node_callback) => {
1372 node_callback(self, next_stmt_id);
1373 }
1374 }
1375
1376 *next_stmt_id += 1;
1377
1378 (cross_ident, left_location_id)
1379 }
1380
1381 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1382 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1383 parse_quote!(cross_join_multiset)
1384 } else {
1385 parse_quote!(join_multiset)
1386 };
1387
1388 let (HydroNode::CrossProduct { left, right, .. }
1389 | HydroNode::Join { left, right, .. }) = self
1390 else {
1391 unreachable!()
1392 };
1393
1394 let (left_inner, left_lifetime) =
1395 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1396 (left, quote!('static))
1397 } else {
1398 (left, quote!('tick))
1399 };
1400
1401 let (right_inner, right_lifetime) =
1402 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1403 (right, quote!('static))
1404 } else {
1405 (right, quote!('tick))
1406 };
1407
1408 let (left_ident, left_location_id) =
1409 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1410 let (right_ident, right_location_id) =
1411 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1412
1413 assert_eq!(
1414 left_location_id, right_location_id,
1415 "join / cross product inputs must be in the same location"
1416 );
1417
1418 let stream_ident =
1419 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1420
1421 match builders_or_callback {
1422 BuildersOrCallback::Builders(graph_builders) => {
1423 let builder = graph_builders.entry(left_location_id).or_default();
1424 builder.add_dfir(
1425 parse_quote! {
1426 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1427 #left_ident -> [0]#stream_ident;
1428 #right_ident -> [1]#stream_ident;
1429 },
1430 None,
1431 Some(&next_stmt_id.to_string()),
1432 );
1433 }
1434 BuildersOrCallback::Callback(_, node_callback) => {
1435 node_callback(self, next_stmt_id);
1436 }
1437 }
1438
1439 *next_stmt_id += 1;
1440
1441 (stream_ident, left_location_id)
1442 }
1443
1444 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1445 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1446 parse_quote!(difference_multiset)
1447 } else {
1448 parse_quote!(anti_join_multiset)
1449 };
1450
1451 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1452 self
1453 else {
1454 unreachable!()
1455 };
1456
1457 let (neg, neg_lifetime) =
1458 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1459 (neg, quote!('static))
1460 } else {
1461 (neg, quote!('tick))
1462 };
1463
1464 let (pos_ident, pos_location_id) =
1465 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1466 let (neg_ident, neg_location_id) =
1467 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1468
1469 assert_eq!(
1470 pos_location_id, neg_location_id,
1471 "difference / anti join inputs must be in the same location"
1472 );
1473
1474 let stream_ident =
1475 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1476
1477 match builders_or_callback {
1478 BuildersOrCallback::Builders(graph_builders) => {
1479 let builder = graph_builders.entry(pos_location_id).or_default();
1480 builder.add_dfir(
1481 parse_quote! {
1482 #stream_ident = #operator::<'tick, #neg_lifetime>();
1483 #pos_ident -> [pos]#stream_ident;
1484 #neg_ident -> [neg]#stream_ident;
1485 },
1486 None,
1487 Some(&next_stmt_id.to_string()),
1488 );
1489 }
1490 BuildersOrCallback::Callback(_, node_callback) => {
1491 node_callback(self, next_stmt_id);
1492 }
1493 }
1494
1495 *next_stmt_id += 1;
1496
1497 (stream_ident, pos_location_id)
1498 }
1499
1500 HydroNode::ResolveFutures { input, .. } => {
1501 let (input_ident, input_location_id) =
1502 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1503
1504 let futures_ident =
1505 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1506
1507 match builders_or_callback {
1508 BuildersOrCallback::Builders(graph_builders) => {
1509 let builder = graph_builders.entry(input_location_id).or_default();
1510 builder.add_dfir(
1511 parse_quote! {
1512 #futures_ident = #input_ident -> resolve_futures();
1513 },
1514 None,
1515 Some(&next_stmt_id.to_string()),
1516 );
1517 }
1518 BuildersOrCallback::Callback(_, node_callback) => {
1519 node_callback(self, next_stmt_id);
1520 }
1521 }
1522
1523 *next_stmt_id += 1;
1524
1525 (futures_ident, input_location_id)
1526 }
1527
1528 HydroNode::ResolveFuturesOrdered { input, .. } => {
1529 let (input_ident, input_location_id) =
1530 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1531
1532 let futures_ident =
1533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1534
1535 match builders_or_callback {
1536 BuildersOrCallback::Builders(graph_builders) => {
1537 let builder = graph_builders.entry(input_location_id).or_default();
1538 builder.add_dfir(
1539 parse_quote! {
1540 #futures_ident = #input_ident -> resolve_futures_ordered();
1541 },
1542 None,
1543 Some(&next_stmt_id.to_string()),
1544 );
1545 }
1546 BuildersOrCallback::Callback(_, node_callback) => {
1547 node_callback(self, next_stmt_id);
1548 }
1549 }
1550
1551 *next_stmt_id += 1;
1552
1553 (futures_ident, input_location_id)
1554 }
1555
1556 HydroNode::Map { f, input, .. } => {
1557 let (input_ident, input_location_id) =
1558 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1559
1560 let map_ident =
1561 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1562
1563 match builders_or_callback {
1564 BuildersOrCallback::Builders(graph_builders) => {
1565 let builder = graph_builders.entry(input_location_id).or_default();
1566 builder.add_dfir(
1567 parse_quote! {
1568 #map_ident = #input_ident -> map(#f);
1569 },
1570 None,
1571 Some(&next_stmt_id.to_string()),
1572 );
1573 }
1574 BuildersOrCallback::Callback(_, node_callback) => {
1575 node_callback(self, next_stmt_id);
1576 }
1577 }
1578
1579 *next_stmt_id += 1;
1580
1581 (map_ident, input_location_id)
1582 }
1583
1584 HydroNode::FlatMap { f, input, .. } => {
1585 let (input_ident, input_location_id) =
1586 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1587
1588 let flat_map_ident =
1589 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1590
1591 match builders_or_callback {
1592 BuildersOrCallback::Builders(graph_builders) => {
1593 let builder = graph_builders.entry(input_location_id).or_default();
1594 builder.add_dfir(
1595 parse_quote! {
1596 #flat_map_ident = #input_ident -> flat_map(#f);
1597 },
1598 None,
1599 Some(&next_stmt_id.to_string()),
1600 );
1601 }
1602 BuildersOrCallback::Callback(_, node_callback) => {
1603 node_callback(self, next_stmt_id);
1604 }
1605 }
1606
1607 *next_stmt_id += 1;
1608
1609 (flat_map_ident, input_location_id)
1610 }
1611
1612 HydroNode::Filter { f, input, .. } => {
1613 let (input_ident, input_location_id) =
1614 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1615
1616 let filter_ident =
1617 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1618
1619 match builders_or_callback {
1620 BuildersOrCallback::Builders(graph_builders) => {
1621 let builder = graph_builders.entry(input_location_id).or_default();
1622 builder.add_dfir(
1623 parse_quote! {
1624 #filter_ident = #input_ident -> filter(#f);
1625 },
1626 None,
1627 Some(&next_stmt_id.to_string()),
1628 );
1629 }
1630 BuildersOrCallback::Callback(_, node_callback) => {
1631 node_callback(self, next_stmt_id);
1632 }
1633 }
1634
1635 *next_stmt_id += 1;
1636
1637 (filter_ident, input_location_id)
1638 }
1639
1640 HydroNode::FilterMap { f, input, .. } => {
1641 let (input_ident, input_location_id) =
1642 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1643
1644 let filter_map_ident =
1645 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1646
1647 match builders_or_callback {
1648 BuildersOrCallback::Builders(graph_builders) => {
1649 let builder = graph_builders.entry(input_location_id).or_default();
1650 builder.add_dfir(
1651 parse_quote! {
1652 #filter_map_ident = #input_ident -> filter_map(#f);
1653 },
1654 None,
1655 Some(&next_stmt_id.to_string()),
1656 );
1657 }
1658 BuildersOrCallback::Callback(_, node_callback) => {
1659 node_callback(self, next_stmt_id);
1660 }
1661 }
1662
1663 *next_stmt_id += 1;
1664
1665 (filter_map_ident, input_location_id)
1666 }
1667
1668 HydroNode::Sort { input, .. } => {
1669 let (input_ident, input_location_id) =
1670 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1671
1672 let sort_ident =
1673 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1674
1675 match builders_or_callback {
1676 BuildersOrCallback::Builders(graph_builders) => {
1677 let builder = graph_builders.entry(input_location_id).or_default();
1678 builder.add_dfir(
1679 parse_quote! {
1680 #sort_ident = #input_ident -> sort();
1681 },
1682 None,
1683 Some(&next_stmt_id.to_string()),
1684 );
1685 }
1686 BuildersOrCallback::Callback(_, node_callback) => {
1687 node_callback(self, next_stmt_id);
1688 }
1689 }
1690
1691 *next_stmt_id += 1;
1692
1693 (sort_ident, input_location_id)
1694 }
1695
1696 HydroNode::DeferTick { input, .. } => {
1697 let (input_ident, input_location_id) =
1698 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1699
1700 let defer_tick_ident =
1701 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1702
1703 match builders_or_callback {
1704 BuildersOrCallback::Builders(graph_builders) => {
1705 let builder = graph_builders.entry(input_location_id).or_default();
1706 builder.add_dfir(
1707 parse_quote! {
1708 #defer_tick_ident = #input_ident -> defer_tick_lazy();
1709 },
1710 None,
1711 Some(&next_stmt_id.to_string()),
1712 );
1713 }
1714 BuildersOrCallback::Callback(_, node_callback) => {
1715 node_callback(self, next_stmt_id);
1716 }
1717 }
1718
1719 *next_stmt_id += 1;
1720
1721 (defer_tick_ident, input_location_id)
1722 }
1723
1724 HydroNode::Enumerate {
1725 is_static, input, ..
1726 } => {
1727 let (input_ident, input_location_id) =
1728 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1729
1730 let enumerate_ident =
1731 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1732
1733 match builders_or_callback {
1734 BuildersOrCallback::Builders(graph_builders) => {
1735 let builder = graph_builders.entry(input_location_id).or_default();
1736 let lifetime = if *is_static {
1737 quote!('static)
1738 } else {
1739 quote!('tick)
1740 };
1741 builder.add_dfir(
1742 parse_quote! {
1743 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1744 },
1745 None,
1746 Some(&next_stmt_id.to_string()),
1747 );
1748 }
1749 BuildersOrCallback::Callback(_, node_callback) => {
1750 node_callback(self, next_stmt_id);
1751 }
1752 }
1753
1754 *next_stmt_id += 1;
1755
1756 (enumerate_ident, input_location_id)
1757 }
1758
1759 HydroNode::Inspect { f, input, .. } => {
1760 let (input_ident, input_location_id) =
1761 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1762
1763 let inspect_ident =
1764 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1765
1766 match builders_or_callback {
1767 BuildersOrCallback::Builders(graph_builders) => {
1768 let builder = graph_builders.entry(input_location_id).or_default();
1769 builder.add_dfir(
1770 parse_quote! {
1771 #inspect_ident = #input_ident -> inspect(#f);
1772 },
1773 None,
1774 Some(&next_stmt_id.to_string()),
1775 );
1776 }
1777 BuildersOrCallback::Callback(_, node_callback) => {
1778 node_callback(self, next_stmt_id);
1779 }
1780 }
1781
1782 *next_stmt_id += 1;
1783
1784 (inspect_ident, input_location_id)
1785 }
1786
1787 HydroNode::Unique { input, .. } => {
1788 let (input_ident, input_location_id) =
1789 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1790
1791 let unique_ident =
1792 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1793
1794 match builders_or_callback {
1795 BuildersOrCallback::Builders(graph_builders) => {
1796 let builder = graph_builders.entry(input_location_id).or_default();
1797 builder.add_dfir(
1798 parse_quote! {
1799 #unique_ident = #input_ident -> unique::<'tick>();
1800 },
1801 None,
1802 Some(&next_stmt_id.to_string()),
1803 );
1804 }
1805 BuildersOrCallback::Callback(_, node_callback) => {
1806 node_callback(self, next_stmt_id);
1807 }
1808 }
1809
1810 *next_stmt_id += 1;
1811
1812 (unique_ident, input_location_id)
1813 }
1814
1815 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1816 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1817 parse_quote!(fold)
1818 } else {
1819 parse_quote!(fold_keyed)
1820 };
1821
1822 let (HydroNode::Fold {
1823 init, acc, input, ..
1824 }
1825 | HydroNode::FoldKeyed {
1826 init, acc, input, ..
1827 }) = self
1828 else {
1829 unreachable!()
1830 };
1831
1832 let (input, lifetime) =
1833 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1834 (input, quote!('static))
1835 } else {
1836 (input, quote!('tick))
1837 };
1838
1839 let (input_ident, input_location_id) =
1840 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1841
1842 let fold_ident =
1843 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1844
1845 match builders_or_callback {
1846 BuildersOrCallback::Builders(graph_builders) => {
1847 let builder = graph_builders.entry(input_location_id).or_default();
1848 builder.add_dfir(
1849 parse_quote! {
1850 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1851 },
1852 None,
1853 Some(&next_stmt_id.to_string()),
1854 );
1855 }
1856 BuildersOrCallback::Callback(_, node_callback) => {
1857 node_callback(self, next_stmt_id);
1858 }
1859 }
1860
1861 *next_stmt_id += 1;
1862
1863 (fold_ident, input_location_id)
1864 }
1865
1866 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1867 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1868 parse_quote!(reduce)
1869 } else {
1870 parse_quote!(reduce_keyed)
1871 };
1872
1873 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1874 self
1875 else {
1876 unreachable!()
1877 };
1878
1879 let (input, lifetime) =
1880 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1881 (input, quote!('static))
1882 } else {
1883 (input, quote!('tick))
1884 };
1885
1886 let (input_ident, input_location_id) =
1887 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1888
1889 let reduce_ident =
1890 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1891
1892 match builders_or_callback {
1893 BuildersOrCallback::Builders(graph_builders) => {
1894 let builder = graph_builders.entry(input_location_id).or_default();
1895 builder.add_dfir(
1896 parse_quote! {
1897 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1898 },
1899 None,
1900 Some(&next_stmt_id.to_string()),
1901 );
1902 }
1903 BuildersOrCallback::Callback(_, node_callback) => {
1904 node_callback(self, next_stmt_id);
1905 }
1906 }
1907
1908 *next_stmt_id += 1;
1909
1910 (reduce_ident, input_location_id)
1911 }
1912
1913 HydroNode::Network {
1914 from_key: _,
1915 to_location,
1916 to_key: _,
1917 serialize_fn: serialize_pipeline,
1918 instantiate_fn,
1919 deserialize_fn: deserialize_pipeline,
1920 input,
1921 ..
1922 } => {
1923 let (input_ident, input_location_id) =
1924 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1925
1926 let to_id = match *to_location {
1927 LocationId::Process(id) => id,
1928 LocationId::Cluster(id) => id,
1929 LocationId::Tick(_, _) => panic!(),
1930 LocationId::ExternalProcess(id) => id,
1931 };
1932
1933 let receiver_stream_ident =
1934 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1935
1936 match builders_or_callback {
1937 BuildersOrCallback::Builders(graph_builders) => {
1938 let (sink_expr, source_expr) = match instantiate_fn {
1939 DebugInstantiate::Building => (
1940 syn::parse_quote!(DUMMY_SINK),
1941 syn::parse_quote!(DUMMY_SOURCE),
1942 ),
1943
1944 DebugInstantiate::Finalized(sink, source, _connect_fn) => {
1945 (sink.clone(), source.clone())
1946 }
1947 };
1948
1949 let sender_builder = graph_builders.entry(input_location_id).or_default();
1950 if let Some(serialize_pipeline) = serialize_pipeline {
1951 sender_builder.add_dfir(
1952 parse_quote! {
1953 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1954 },
1955 None,
1956 Some(&next_stmt_id.to_string()),
1957 );
1958 } else {
1959 sender_builder.add_dfir(
1960 parse_quote! {
1961 #input_ident -> dest_sink(#sink_expr);
1962 },
1963 None,
1964 Some(&next_stmt_id.to_string()),
1965 );
1966 }
1967
1968 let receiver_builder = graph_builders.entry(to_id).or_default();
1969 if let Some(deserialize_pipeline) = deserialize_pipeline {
1970 receiver_builder.add_dfir(parse_quote! {
1971 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1972 }, None, Some(&next_stmt_id.to_string()));
1973 } else {
1974 receiver_builder.add_dfir(
1975 parse_quote! {
1976 #receiver_stream_ident = source_stream(#source_expr);
1977 },
1978 None,
1979 Some(&next_stmt_id.to_string()),
1980 );
1981 }
1982 }
1983 BuildersOrCallback::Callback(_, node_callback) => {
1984 node_callback(self, next_stmt_id);
1985 }
1986 }
1987
1988 *next_stmt_id += 1;
1989
1990 (receiver_stream_ident, to_id)
1991 }
1992
1993 HydroNode::Counter {
1994 tag,
1995 duration,
1996 input,
1997 ..
1998 } => {
1999 let (input_ident, input_location_id) =
2000 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2001
2002 let counter_ident =
2003 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2004
2005 match builders_or_callback {
2006 BuildersOrCallback::Builders(graph_builders) => {
2007 let builder = graph_builders.entry(input_location_id).or_default();
2008 builder.add_dfir(
2009 parse_quote! {
2010 #counter_ident = #input_ident -> _counter(#tag, #duration);
2011 },
2012 None,
2013 Some(&next_stmt_id.to_string()),
2014 );
2015 }
2016 BuildersOrCallback::Callback(_, node_callback) => {
2017 node_callback(self, next_stmt_id);
2018 }
2019 }
2020
2021 *next_stmt_id += 1;
2022
2023 (counter_ident, input_location_id)
2024 }
2025 }
2026 }
2027
2028 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2029 match self {
2030 HydroNode::Placeholder => {
2031 panic!()
2032 }
2033 HydroNode::Source { source, .. } => match source {
2034 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2035 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2036 },
2037 HydroNode::CycleSource { .. }
2038 | HydroNode::Tee { .. }
2039 | HydroNode::Persist { .. }
2040 | HydroNode::Unpersist { .. }
2041 | HydroNode::Delta { .. }
2042 | HydroNode::Chain { .. }
2043 | HydroNode::CrossProduct { .. }
2044 | HydroNode::CrossSingleton { .. }
2045 | HydroNode::ResolveFutures { .. }
2046 | HydroNode::ResolveFuturesOrdered { .. }
2047 | HydroNode::Join { .. }
2048 | HydroNode::Difference { .. }
2049 | HydroNode::AntiJoin { .. }
2050 | HydroNode::DeferTick { .. }
2051 | HydroNode::Enumerate { .. }
2052 | HydroNode::Unique { .. }
2053 | HydroNode::Sort { .. } => {}
2054 HydroNode::Map { f, .. }
2055 | HydroNode::FlatMap { f, .. }
2056 | HydroNode::Filter { f, .. }
2057 | HydroNode::FilterMap { f, .. }
2058 | HydroNode::Inspect { f, .. }
2059 | HydroNode::Reduce { f, .. }
2060 | HydroNode::ReduceKeyed { f, .. } => {
2061 transform(f);
2062 }
2063 HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
2064 transform(init);
2065 transform(acc);
2066 }
2067 HydroNode::Network {
2068 serialize_fn,
2069 deserialize_fn,
2070 ..
2071 } => {
2072 if let Some(serialize_fn) = serialize_fn {
2073 transform(serialize_fn);
2074 }
2075 if let Some(deserialize_fn) = deserialize_fn {
2076 transform(deserialize_fn);
2077 }
2078 }
2079 HydroNode::Counter { duration, .. } => {
2080 transform(duration);
2081 }
2082 }
2083 }
2084
2085 pub fn metadata(&self) -> &HydroIrMetadata {
2086 match self {
2087 HydroNode::Placeholder => {
2088 panic!()
2089 }
2090 HydroNode::Source { metadata, .. } => metadata,
2091 HydroNode::CycleSource { metadata, .. } => metadata,
2092 HydroNode::Tee { metadata, .. } => metadata,
2093 HydroNode::Persist { metadata, .. } => metadata,
2094 HydroNode::Unpersist { metadata, .. } => metadata,
2095 HydroNode::Delta { metadata, .. } => metadata,
2096 HydroNode::Chain { metadata, .. } => metadata,
2097 HydroNode::CrossProduct { metadata, .. } => metadata,
2098 HydroNode::CrossSingleton { metadata, .. } => metadata,
2099 HydroNode::Join { metadata, .. } => metadata,
2100 HydroNode::Difference { metadata, .. } => metadata,
2101 HydroNode::AntiJoin { metadata, .. } => metadata,
2102 HydroNode::ResolveFutures { metadata, .. } => metadata,
2103 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2104 HydroNode::Map { metadata, .. } => metadata,
2105 HydroNode::FlatMap { metadata, .. } => metadata,
2106 HydroNode::Filter { metadata, .. } => metadata,
2107 HydroNode::FilterMap { metadata, .. } => metadata,
2108 HydroNode::DeferTick { metadata, .. } => metadata,
2109 HydroNode::Enumerate { metadata, .. } => metadata,
2110 HydroNode::Inspect { metadata, .. } => metadata,
2111 HydroNode::Unique { metadata, .. } => metadata,
2112 HydroNode::Sort { metadata, .. } => metadata,
2113 HydroNode::Fold { metadata, .. } => metadata,
2114 HydroNode::FoldKeyed { metadata, .. } => metadata,
2115 HydroNode::Reduce { metadata, .. } => metadata,
2116 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2117 HydroNode::Network { metadata, .. } => metadata,
2118 HydroNode::Counter { metadata, .. } => metadata,
2119 }
2120 }
2121
2122 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2123 match self {
2124 HydroNode::Placeholder => {
2125 panic!()
2126 }
2127 HydroNode::Source { metadata, .. } => metadata,
2128 HydroNode::CycleSource { metadata, .. } => metadata,
2129 HydroNode::Tee { metadata, .. } => metadata,
2130 HydroNode::Persist { metadata, .. } => metadata,
2131 HydroNode::Unpersist { metadata, .. } => metadata,
2132 HydroNode::Delta { metadata, .. } => metadata,
2133 HydroNode::Chain { metadata, .. } => metadata,
2134 HydroNode::CrossProduct { metadata, .. } => metadata,
2135 HydroNode::CrossSingleton { metadata, .. } => metadata,
2136 HydroNode::Join { metadata, .. } => metadata,
2137 HydroNode::Difference { metadata, .. } => metadata,
2138 HydroNode::AntiJoin { metadata, .. } => metadata,
2139 HydroNode::ResolveFutures { metadata, .. } => metadata,
2140 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2141 HydroNode::Map { metadata, .. } => metadata,
2142 HydroNode::FlatMap { metadata, .. } => metadata,
2143 HydroNode::Filter { metadata, .. } => metadata,
2144 HydroNode::FilterMap { metadata, .. } => metadata,
2145 HydroNode::DeferTick { metadata, .. } => metadata,
2146 HydroNode::Enumerate { metadata, .. } => metadata,
2147 HydroNode::Inspect { metadata, .. } => metadata,
2148 HydroNode::Unique { metadata, .. } => metadata,
2149 HydroNode::Sort { metadata, .. } => metadata,
2150 HydroNode::Fold { metadata, .. } => metadata,
2151 HydroNode::FoldKeyed { metadata, .. } => metadata,
2152 HydroNode::Reduce { metadata, .. } => metadata,
2153 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2154 HydroNode::Network { metadata, .. } => metadata,
2155 HydroNode::Counter { metadata, .. } => metadata,
2156 }
2157 }
2158
2159 pub fn print_root(&self) -> String {
2160 match self {
2161 HydroNode::Placeholder => {
2162 panic!()
2163 }
2164 HydroNode::Source { source, .. } => format!("Source({:?})", source),
2165 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2166 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2167 HydroNode::Persist { .. } => "Persist()".to_string(),
2168 HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2169 HydroNode::Delta { .. } => "Delta()".to_string(),
2170 HydroNode::Chain { first, second, .. } => {
2171 format!("Chain({}, {})", first.print_root(), second.print_root())
2172 }
2173 HydroNode::CrossProduct { left, right, .. } => {
2174 format!(
2175 "CrossProduct({}, {})",
2176 left.print_root(),
2177 right.print_root()
2178 )
2179 }
2180 HydroNode::CrossSingleton { left, right, .. } => {
2181 format!(
2182 "CrossSingleton({}, {})",
2183 left.print_root(),
2184 right.print_root()
2185 )
2186 }
2187 HydroNode::Join { left, right, .. } => {
2188 format!("Join({}, {})", left.print_root(), right.print_root())
2189 }
2190 HydroNode::Difference { pos, neg, .. } => {
2191 format!("Difference({}, {})", pos.print_root(), neg.print_root())
2192 }
2193 HydroNode::AntiJoin { pos, neg, .. } => {
2194 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2195 }
2196 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2197 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2198 HydroNode::Map { f, .. } => format!("Map({:?})", f),
2199 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2200 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2201 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2202 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2203 HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2204 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2205 HydroNode::Unique { .. } => "Unique()".to_string(),
2206 HydroNode::Sort { .. } => "Sort()".to_string(),
2207 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2208 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2209 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2210 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2211 HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2212 HydroNode::Counter { tag, duration, .. } => {
2213 format!("Counter({:?}, {:?})", tag, duration)
2214 }
2215 }
2216 }
2217}
2218
2219#[cfg(feature = "build")]
2220#[expect(clippy::too_many_arguments, reason = "networking internals")]
2221fn instantiate_network<'a, D: Deploy<'a>>(
2222 from_location: &LocationId,
2223 from_key: Option<usize>,
2224 to_location: &LocationId,
2225 to_key: Option<usize>,
2226 nodes: &HashMap<usize, D::Process>,
2227 clusters: &HashMap<usize, D::Cluster>,
2228 externals: &HashMap<usize, D::ExternalProcess>,
2229 compile_env: &D::CompileEnv,
2230) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>) {
2231 let ((sink, source), connect_fn) = match (from_location, to_location) {
2232 (LocationId::Process(from), LocationId::Process(to)) => {
2233 let from_node = nodes
2234 .get(from)
2235 .unwrap_or_else(|| {
2236 panic!("A process used in the graph was not instantiated: {}", from)
2237 })
2238 .clone();
2239 let to_node = nodes
2240 .get(to)
2241 .unwrap_or_else(|| {
2242 panic!("A process used in the graph was not instantiated: {}", to)
2243 })
2244 .clone();
2245
2246 let sink_port = D::allocate_process_port(&from_node);
2247 let source_port = D::allocate_process_port(&to_node);
2248
2249 (
2250 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2251 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2252 )
2253 }
2254 (LocationId::Process(from), LocationId::Cluster(to)) => {
2255 let from_node = nodes
2256 .get(from)
2257 .unwrap_or_else(|| {
2258 panic!("A process used in the graph was not instantiated: {}", from)
2259 })
2260 .clone();
2261 let to_node = clusters
2262 .get(to)
2263 .unwrap_or_else(|| {
2264 panic!("A cluster used in the graph was not instantiated: {}", to)
2265 })
2266 .clone();
2267
2268 let sink_port = D::allocate_process_port(&from_node);
2269 let source_port = D::allocate_cluster_port(&to_node);
2270
2271 (
2272 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2273 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2274 )
2275 }
2276 (LocationId::Cluster(from), LocationId::Process(to)) => {
2277 let from_node = clusters
2278 .get(from)
2279 .unwrap_or_else(|| {
2280 panic!("A cluster used in the graph was not instantiated: {}", from)
2281 })
2282 .clone();
2283 let to_node = nodes
2284 .get(to)
2285 .unwrap_or_else(|| {
2286 panic!("A process used in the graph was not instantiated: {}", to)
2287 })
2288 .clone();
2289
2290 let sink_port = D::allocate_cluster_port(&from_node);
2291 let source_port = D::allocate_process_port(&to_node);
2292
2293 (
2294 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2295 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2296 )
2297 }
2298 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2299 let from_node = clusters
2300 .get(from)
2301 .unwrap_or_else(|| {
2302 panic!("A cluster used in the graph was not instantiated: {}", from)
2303 })
2304 .clone();
2305 let to_node = clusters
2306 .get(to)
2307 .unwrap_or_else(|| {
2308 panic!("A cluster used in the graph was not instantiated: {}", to)
2309 })
2310 .clone();
2311
2312 let sink_port = D::allocate_cluster_port(&from_node);
2313 let source_port = D::allocate_cluster_port(&to_node);
2314
2315 (
2316 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2317 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2318 )
2319 }
2320 (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2321 let from_node = externals
2322 .get(from)
2323 .unwrap_or_else(|| {
2324 panic!(
2325 "A external used in the graph was not instantiated: {}",
2326 from
2327 )
2328 })
2329 .clone();
2330
2331 let to_node = nodes
2332 .get(to)
2333 .unwrap_or_else(|| {
2334 panic!("A process used in the graph was not instantiated: {}", to)
2335 })
2336 .clone();
2337
2338 let sink_port = D::allocate_external_port(&from_node);
2339 let source_port = D::allocate_process_port(&to_node);
2340
2341 from_node.register(from_key.unwrap(), sink_port.clone());
2342
2343 (
2344 (
2345 parse_quote!(DUMMY),
2346 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2347 ),
2348 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2349 )
2350 }
2351 (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2352 todo!("NYI")
2353 }
2354 (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2355 panic!("Cannot send from external to external")
2356 }
2357 (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2358 let from_node = nodes
2359 .get(from)
2360 .unwrap_or_else(|| {
2361 panic!("A process used in the graph was not instantiated: {}", from)
2362 })
2363 .clone();
2364
2365 let to_node = externals
2366 .get(to)
2367 .unwrap_or_else(|| {
2368 panic!("A external used in the graph was not instantiated: {}", to)
2369 })
2370 .clone();
2371
2372 let sink_port = D::allocate_process_port(&from_node);
2373 let source_port = D::allocate_external_port(&to_node);
2374
2375 to_node.register(to_key.unwrap(), source_port.clone());
2376
2377 (
2378 (
2379 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2380 parse_quote!(DUMMY),
2381 ),
2382 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2383 )
2384 }
2385 (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2386 todo!("NYI")
2387 }
2388 (LocationId::Tick(_, _), _) => panic!(),
2389 (_, LocationId::Tick(_, _)) => panic!(),
2390 };
2391 (sink, source, connect_fn)
2392}