hydro_lang/rewrites/
insert_counter.rs

1use std::time::Duration;
2
3use stageleft::Quoted;
4
5use crate::ir::*;
6
7fn insert_counter_node(node: &mut HydroNode, next_stmt_id: &mut usize, duration: syn::Expr) {
8    match node {
9        HydroNode::Placeholder
10        | HydroNode::Unpersist { .. }
11        | HydroNode::Counter { .. } => {
12            std::panic!("Unexpected {:?} found in insert_counter_node", node.print_root());
13        }
14        HydroNode::Source { metadata, .. }
15        | HydroNode::CycleSource { metadata, .. }
16        | HydroNode::Persist { metadata, .. }
17        | HydroNode::Delta { metadata, .. }
18        | HydroNode::Chain { metadata, .. } // Can technically be derived by summing parent cardinalities
19        | HydroNode::CrossSingleton { metadata, .. }
20        | HydroNode::CrossProduct { metadata, .. } // Can technically be derived by multiplying parent cardinalities
21        | HydroNode::Join { metadata, .. }
22        | HydroNode::ResolveFutures { metadata, .. }
23        | HydroNode::ResolveFuturesOrdered { metadata, .. }
24        | HydroNode::Difference { metadata, .. }
25        | HydroNode::AntiJoin { metadata, .. }
26        | HydroNode::FlatMap { metadata, .. }
27        | HydroNode::Filter { metadata, .. }
28        | HydroNode::FilterMap { metadata, .. }
29        | HydroNode::Unique { metadata, .. }
30        | HydroNode::Fold { metadata, .. } // Output 1 value per tick
31        | HydroNode::Reduce { metadata, .. } // Output 1 value per tick
32        | HydroNode::FoldKeyed { metadata, .. }
33        | HydroNode::ReduceKeyed { metadata, .. }
34        | HydroNode::Network { metadata, .. }
35         => {
36            let metadata = metadata.clone();
37            let node_content = std::mem::replace(node, HydroNode::Placeholder);
38
39            let counter = HydroNode::Counter {
40                tag: next_stmt_id.to_string(),
41                duration: duration.into(),
42                input: Box::new(node_content),
43                metadata: metadata.clone(),
44            };
45
46            // when we emit this IR, the counter will bump the stmt id, so simulate that here
47            *next_stmt_id += 1;
48
49            *node = counter;
50        }
51        HydroNode::Tee { .. } // Do nothing, we will count the parent of the Tee
52        | HydroNode::Map { .. } // Equal to parent cardinality
53        | HydroNode::DeferTick { .. } // Equal to parent cardinality
54        | HydroNode::Enumerate { .. }
55        | HydroNode::Inspect { .. }
56        | HydroNode::Sort { .. }
57         => {}
58    }
59}
60
61pub fn insert_counter(ir: &mut [HydroLeaf], duration: impl Quoted<'static, Duration>) {
62    let duration = duration.splice_typed();
63    traverse_dfir(
64        ir,
65        |_, _| {},
66        |node, next_stmt_id| {
67            insert_counter_node(node, next_stmt_id, duration.clone());
68        },
69    );
70}