hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21 CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub struct TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub struct NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41 /// The weaker of the two orderings.
42 type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47 type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52 type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57 type Min = NoOrder;
58}
59
60/// An ordered sequence stream of elements of type `T`.
61///
62/// Type Parameters:
63/// - `T`: the type of elements in the stream
64/// - `L`: the location where the stream is being materialized
65/// - `B`: the boundedness of the stream, which is either [`Bounded`]
66/// or [`Unbounded`]
67/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
68/// or [`NoOrder`] (default is [`TotalOrder`])
69pub struct Stream<T, L, B, Order = TotalOrder> {
70 location: L,
71 pub(crate) ir_node: RefCell<HydroNode>,
72
73 _phantom: PhantomData<(T, L, B, Order)>,
74}
75
76impl<'a, T, L: Location<'a>, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O> {
77 fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
78 Stream {
79 location: stream.location,
80 ir_node: stream.ir_node,
81 _phantom: PhantomData,
82 }
83 }
84}
85
86impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder> {
87 fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
88 Stream {
89 location: stream.location,
90 ir_node: stream.ir_node,
91 _phantom: PhantomData,
92 }
93 }
94}
95
96impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
97 fn location_kind(&self) -> LocationId {
98 self.location.id()
99 }
100}
101
102impl<'a, T, L: Location<'a>, Order> DeferTick for Stream<T, Tick<L>, Bounded, Order> {
103 fn defer_tick(self) -> Self {
104 Stream::defer_tick(self)
105 }
106}
107
108impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker>
109 for Stream<T, Tick<L>, Bounded, Order>
110{
111 type Location = Tick<L>;
112
113 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
114 let location_id = location.id();
115 Stream::new(
116 location.clone(),
117 HydroNode::CycleSource {
118 ident,
119 location_kind: location_id,
120 metadata: location.new_node_metadata::<T>(),
121 },
122 )
123 }
124}
125
126impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker>
127 for Stream<T, Tick<L>, Bounded, Order>
128{
129 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
130 assert_eq!(
131 self.location.id(),
132 expected_location,
133 "locations do not match"
134 );
135 self.location
136 .flow_state()
137 .borrow_mut()
138 .leaves
139 .as_mut()
140 .expect(FLOW_USED_MESSAGE)
141 .push(HydroLeaf::CycleSink {
142 ident,
143 location_kind: self.location_kind(),
144 input: Box::new(self.ir_node.into_inner()),
145 metadata: self.location.new_node_metadata::<T>(),
146 });
147 }
148}
149
150impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker>
151 for Stream<T, L, B, Order>
152{
153 type Location = L;
154
155 fn create_source(ident: syn::Ident, location: L) -> Self {
156 let location_id = location.id();
157
158 Stream::new(
159 location.clone(),
160 HydroNode::Persist {
161 inner: Box::new(HydroNode::CycleSource {
162 ident,
163 location_kind: location_id,
164 metadata: location.new_node_metadata::<T>(),
165 }),
166 metadata: location.new_node_metadata::<T>(),
167 },
168 )
169 }
170}
171
172impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker>
173 for Stream<T, L, B, Order>
174{
175 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
176 assert_eq!(
177 self.location.id(),
178 expected_location,
179 "locations do not match"
180 );
181 let metadata = self.location.new_node_metadata::<T>();
182 self.location
183 .flow_state()
184 .borrow_mut()
185 .leaves
186 .as_mut()
187 .expect(FLOW_USED_MESSAGE)
188 .push(HydroLeaf::CycleSink {
189 ident,
190 location_kind: self.location_kind(),
191 input: Box::new(HydroNode::Unpersist {
192 inner: Box::new(self.ir_node.into_inner()),
193 metadata: metadata.clone(),
194 }),
195 metadata,
196 });
197 }
198}
199
200impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
201 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
202 Stream {
203 location,
204 ir_node: RefCell::new(ir_node),
205 _phantom: PhantomData,
206 }
207 }
208}
209
210impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream<T, L, B, Order> {
211 fn clone(&self) -> Self {
212 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
213 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
214 *self.ir_node.borrow_mut() = HydroNode::Tee {
215 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
216 metadata: self.location.new_node_metadata::<T>(),
217 };
218 }
219
220 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
221 Stream {
222 location: self.location.clone(),
223 ir_node: HydroNode::Tee {
224 inner: TeeNode(inner.0.clone()),
225 metadata: metadata.clone(),
226 }
227 .into(),
228 _phantom: PhantomData,
229 }
230 } else {
231 unreachable!()
232 }
233 }
234}
235
236impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
237 /// Produces a stream based on invoking `f` on each element in order.
238 /// If you do not want to modify the stream and instead only want to view
239 /// each item use [`Stream::inspect`] instead.
240 ///
241 /// # Example
242 /// ```rust
243 /// # use hydro_lang::*;
244 /// # use futures::StreamExt;
245 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
246 /// let words = process.source_iter(q!(vec!["hello", "world"]));
247 /// words.map(q!(|x| x.to_uppercase()))
248 /// # }, |mut stream| async move {
249 /// # for w in vec!["HELLO", "WORLD"] {
250 /// # assert_eq!(stream.next().await.unwrap(), w);
251 /// # }
252 /// # }));
253 /// ```
254 pub fn map<U, F: Fn(T) -> U + 'a>(
255 self,
256 f: impl IntoQuotedMut<'a, F, L>,
257 ) -> Stream<U, L, B, Order> {
258 let f = f.splice_fn1_ctx(&self.location).into();
259 Stream::new(
260 self.location.clone(),
261 HydroNode::Map {
262 f,
263 input: Box::new(self.ir_node.into_inner()),
264 metadata: self.location.new_node_metadata::<U>(),
265 },
266 )
267 }
268
269 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
270 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
271 /// for the output type `U` must produce items in a **deterministic** order.
272 ///
273 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
274 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
275 ///
276 /// # Example
277 /// ```rust
278 /// # use hydro_lang::*;
279 /// # use futures::StreamExt;
280 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
281 /// process
282 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
283 /// .flat_map_ordered(q!(|x| x))
284 /// # }, |mut stream| async move {
285 /// // 1, 2, 3, 4
286 /// # for w in (1..5) {
287 /// # assert_eq!(stream.next().await.unwrap(), w);
288 /// # }
289 /// # }));
290 /// ```
291 pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
292 self,
293 f: impl IntoQuotedMut<'a, F, L>,
294 ) -> Stream<U, L, B, Order> {
295 let f = f.splice_fn1_ctx(&self.location).into();
296 Stream::new(
297 self.location.clone(),
298 HydroNode::FlatMap {
299 f,
300 input: Box::new(self.ir_node.into_inner()),
301 metadata: self.location.new_node_metadata::<U>(),
302 },
303 )
304 }
305
306 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
307 /// for the output type `U` to produce items in any order.
308 ///
309 /// # Example
310 /// ```rust
311 /// # use hydro_lang::*;
312 /// # use futures::StreamExt;
313 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
314 /// process
315 /// .source_iter(q!(vec![
316 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
317 /// std::collections::HashSet::from_iter(vec![3, 4]),
318 /// ]))
319 /// .flat_map_unordered(q!(|x| x))
320 /// # }, |mut stream| async move {
321 /// // 1, 2, 3, 4, but in no particular order
322 /// # let mut results = Vec::new();
323 /// # for w in (1..5) {
324 /// # results.push(stream.next().await.unwrap());
325 /// # }
326 /// # results.sort();
327 /// # assert_eq!(results, vec![1, 2, 3, 4]);
328 /// # }));
329 /// ```
330 pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
331 self,
332 f: impl IntoQuotedMut<'a, F, L>,
333 ) -> Stream<U, L, B, NoOrder> {
334 let f = f.splice_fn1_ctx(&self.location).into();
335 Stream::new(
336 self.location.clone(),
337 HydroNode::FlatMap {
338 f,
339 input: Box::new(self.ir_node.into_inner()),
340 metadata: self.location.new_node_metadata::<U>(),
341 },
342 )
343 }
344
345 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
346 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
347 ///
348 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
349 /// not deterministic, use [`Stream::flatten_unordered`] instead.
350 ///
351 /// ```rust
352 /// # use hydro_lang::*;
353 /// # use futures::StreamExt;
354 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
355 /// process
356 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
357 /// .flatten_ordered()
358 /// # }, |mut stream| async move {
359 /// // 1, 2, 3, 4
360 /// # for w in (1..5) {
361 /// # assert_eq!(stream.next().await.unwrap(), w);
362 /// # }
363 /// # }));
364 /// ```
365 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, Order>
366 where
367 T: IntoIterator<Item = U>,
368 {
369 self.flat_map_ordered(q!(|d| d))
370 }
371
372 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
373 /// for the element type `T` to produce items in any order.
374 ///
375 /// # Example
376 /// ```rust
377 /// # use hydro_lang::*;
378 /// # use futures::StreamExt;
379 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
380 /// process
381 /// .source_iter(q!(vec![
382 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
383 /// std::collections::HashSet::from_iter(vec![3, 4]),
384 /// ]))
385 /// .flatten_unordered()
386 /// # }, |mut stream| async move {
387 /// // 1, 2, 3, 4, but in no particular order
388 /// # let mut results = Vec::new();
389 /// # for w in (1..5) {
390 /// # results.push(stream.next().await.unwrap());
391 /// # }
392 /// # results.sort();
393 /// # assert_eq!(results, vec![1, 2, 3, 4]);
394 /// # }));
395 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
396 where
397 T: IntoIterator<Item = U>,
398 {
399 self.flat_map_unordered(q!(|d| d))
400 }
401
402 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
403 /// `f`, preserving the order of the elements.
404 ///
405 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
406 /// not modify or take ownership of the values. If you need to modify the values while filtering
407 /// use [`Stream::filter_map`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::*;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
414 /// process
415 /// .source_iter(q!(vec![1, 2, 3, 4]))
416 /// .filter(q!(|&x| x > 2))
417 /// # }, |mut stream| async move {
418 /// // 3, 4
419 /// # for w in (3..5) {
420 /// # assert_eq!(stream.next().await.unwrap(), w);
421 /// # }
422 /// # }));
423 /// ```
424 pub fn filter<F: Fn(&T) -> bool + 'a>(
425 self,
426 f: impl IntoQuotedMut<'a, F, L>,
427 ) -> Stream<T, L, B, Order> {
428 let f = f.splice_fn1_borrow_ctx(&self.location).into();
429 Stream::new(
430 self.location.clone(),
431 HydroNode::Filter {
432 f,
433 input: Box::new(self.ir_node.into_inner()),
434 metadata: self.location.new_node_metadata::<T>(),
435 },
436 )
437 }
438
439 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
440 ///
441 /// # Example
442 /// ```rust
443 /// # use hydro_lang::*;
444 /// # use futures::StreamExt;
445 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
446 /// process
447 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
448 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
449 /// # }, |mut stream| async move {
450 /// // 1, 2
451 /// # for w in (1..3) {
452 /// # assert_eq!(stream.next().await.unwrap(), w);
453 /// # }
454 /// # }));
455 pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
456 self,
457 f: impl IntoQuotedMut<'a, F, L>,
458 ) -> Stream<U, L, B, Order> {
459 let f = f.splice_fn1_ctx(&self.location).into();
460 Stream::new(
461 self.location.clone(),
462 HydroNode::FilterMap {
463 f,
464 input: Box::new(self.ir_node.into_inner()),
465 metadata: self.location.new_node_metadata::<U>(),
466 },
467 )
468 }
469
470 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
471 /// where `x` is the final value of `other`, a bounded [`Singleton`].
472 ///
473 /// # Example
474 /// ```rust
475 /// # use hydro_lang::*;
476 /// # use futures::StreamExt;
477 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
478 /// let tick = process.tick();
479 /// let batch = unsafe {
480 /// process
481 /// .source_iter(q!(vec![1, 2, 3, 4]))
482 /// .tick_batch(&tick)
483 /// };
484 /// let count = batch.clone().count(); // `count()` returns a singleton
485 /// batch.cross_singleton(count).all_ticks()
486 /// # }, |mut stream| async move {
487 /// // (1, 4), (2, 4), (3, 4), (4, 4)
488 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
489 /// # assert_eq!(stream.next().await.unwrap(), w);
490 /// # }
491 /// # }));
492 pub fn cross_singleton<O>(
493 self,
494 other: impl Into<Optional<O, L, Bounded>>,
495 ) -> Stream<(T, O), L, B, Order>
496 where
497 O: Clone,
498 {
499 let other: Optional<O, L, Bounded> = other.into();
500 check_matching_location(&self.location, &other.location);
501
502 Stream::new(
503 self.location.clone(),
504 HydroNode::CrossSingleton {
505 left: Box::new(self.ir_node.into_inner()),
506 right: Box::new(other.ir_node.into_inner()),
507 metadata: self.location.new_node_metadata::<(T, O)>(),
508 },
509 )
510 }
511
512 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
513 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
514 self.cross_singleton(signal.map(q!(|_u| ())))
515 .map(q!(|(d, _signal)| d))
516 }
517
518 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
519 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
520 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
521 }
522
523 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
524 /// tupled pairs in a non-deterministic order.
525 ///
526 /// # Example
527 /// ```rust
528 /// # use hydro_lang::*;
529 /// # use std::collections::HashSet;
530 /// # use futures::StreamExt;
531 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
532 /// let tick = process.tick();
533 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
534 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
535 /// stream1.cross_product(stream2)
536 /// # }, |mut stream| async move {
537 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
538 /// # stream.map(|i| assert!(expected.contains(&i)));
539 /// # }));
540 pub fn cross_product<O>(self, other: Stream<O, L, B, Order>) -> Stream<(T, O), L, B, NoOrder>
541 where
542 T: Clone,
543 O: Clone,
544 {
545 check_matching_location(&self.location, &other.location);
546
547 Stream::new(
548 self.location.clone(),
549 HydroNode::CrossProduct {
550 left: Box::new(self.ir_node.into_inner()),
551 right: Box::new(other.ir_node.into_inner()),
552 metadata: self.location.new_node_metadata::<(T, O)>(),
553 },
554 )
555 }
556
557 /// Takes one stream as input and filters out any duplicate occurrences. The output
558 /// contains all unique values from the input.
559 ///
560 /// # Example
561 /// ```rust
562 /// # use hydro_lang::*;
563 /// # use futures::StreamExt;
564 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
565 /// let tick = process.tick();
566 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
567 /// # }, |mut stream| async move {
568 /// # for w in vec![1, 2, 3, 4] {
569 /// # assert_eq!(stream.next().await.unwrap(), w);
570 /// # }
571 /// # }));
572 pub fn unique(self) -> Stream<T, L, B, Order>
573 where
574 T: Eq + Hash,
575 {
576 Stream::new(
577 self.location.clone(),
578 HydroNode::Unique {
579 input: Box::new(self.ir_node.into_inner()),
580 metadata: self.location.new_node_metadata::<T>(),
581 },
582 )
583 }
584
585 /// Outputs everything in this stream that is *not* contained in the `other` stream.
586 ///
587 /// The `other` stream must be [`Bounded`], since this function will wait until
588 /// all its elements are available before producing any output.
589 /// # Example
590 /// ```rust
591 /// # use hydro_lang::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
594 /// let tick = process.tick();
595 /// let stream = unsafe {
596 /// process
597 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
598 /// .tick_batch(&tick)
599 /// };
600 /// let batch = unsafe {
601 /// process
602 /// .source_iter(q!(vec![1, 2]))
603 /// .tick_batch(&tick)
604 /// };
605 /// stream.filter_not_in(batch).all_ticks()
606 /// # }, |mut stream| async move {
607 /// # for w in vec![3, 4] {
608 /// # assert_eq!(stream.next().await.unwrap(), w);
609 /// # }
610 /// # }));
611 pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order>
612 where
613 T: Eq + Hash,
614 {
615 check_matching_location(&self.location, &other.location);
616
617 Stream::new(
618 self.location.clone(),
619 HydroNode::Difference {
620 pos: Box::new(self.ir_node.into_inner()),
621 neg: Box::new(other.ir_node.into_inner()),
622 metadata: self.location.new_node_metadata::<T>(),
623 },
624 )
625 }
626
627 /// An operator which allows you to "inspect" each element of a stream without
628 /// modifying it. The closure `f` is called on a reference to each item. This is
629 /// mainly useful for debugging, and should not be used to generate side-effects.
630 ///
631 /// # Example
632 /// ```rust
633 /// # use hydro_lang::*;
634 /// # use futures::StreamExt;
635 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
636 /// let nums = process.source_iter(q!(vec![1, 2]));
637 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
638 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
639 /// # }, |mut stream| async move {
640 /// # for w in vec![1, 2] {
641 /// # assert_eq!(stream.next().await.unwrap(), w);
642 /// # }
643 /// # }));
644 /// ```
645 pub fn inspect<F: Fn(&T) + 'a>(
646 self,
647 f: impl IntoQuotedMut<'a, F, L>,
648 ) -> Stream<T, L, B, Order> {
649 let f = f.splice_fn1_borrow_ctx(&self.location).into();
650
651 if L::is_top_level() {
652 Stream::new(
653 self.location.clone(),
654 HydroNode::Persist {
655 inner: Box::new(HydroNode::Inspect {
656 f,
657 input: Box::new(HydroNode::Unpersist {
658 inner: Box::new(self.ir_node.into_inner()),
659 metadata: self.location.new_node_metadata::<T>(),
660 }),
661 metadata: self.location.new_node_metadata::<T>(),
662 }),
663 metadata: self.location.new_node_metadata::<T>(),
664 },
665 )
666 } else {
667 Stream::new(
668 self.location.clone(),
669 HydroNode::Inspect {
670 f,
671 input: Box::new(self.ir_node.into_inner()),
672 metadata: self.location.new_node_metadata::<T>(),
673 },
674 )
675 }
676 }
677
678 /// Explicitly "casts" the stream to a type with a different ordering
679 /// guarantee. Useful in unsafe code where the ordering cannot be proven
680 /// by the type-system.
681 ///
682 /// # Safety
683 /// This function is used as an escape hatch, and any mistakes in the
684 /// provided ordering guarantee will propagate into the guarantees
685 /// for the rest of the program.
686 ///
687 /// # Example
688 /// # TODO: more sensible code after Shadaj merges
689 /// ```rust
690 /// # use hydro_lang::*;
691 /// # use std::collections::HashSet;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
694 /// let nums = process.source_iter(q!({
695 /// let now = std::time::SystemTime::now();
696 /// match now.elapsed().unwrap().as_secs() % 2 {
697 /// 0 => vec![5, 4, 3, 2, 1],
698 /// _ => vec![1, 2, 3, 4, 5],
699 /// }
700 /// .into_iter()
701 /// }));
702 /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
703 /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
704 /// stream
705 /// # }, |mut stream| async move {
706 /// # for w in vec![1, 2, 3, 4, 5] {
707 /// # assert!((1..=5).contains(&stream.next().await.unwrap()));
708 /// # }
709 /// # }));
710 /// ```
711 pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O> {
712 Stream::new(self.location, self.ir_node.into_inner())
713 }
714}
715
716impl<'a, T, L: Location<'a>, B, Order> Stream<&T, L, B, Order> {
717 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
718 ///
719 /// # Example
720 /// ```rust
721 /// # use hydro_lang::*;
722 /// # use futures::StreamExt;
723 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
724 /// process.source_iter(q!(&[1, 2, 3])).cloned()
725 /// # }, |mut stream| async move {
726 /// // 1, 2, 3
727 /// # for w in vec![1, 2, 3] {
728 /// # assert_eq!(stream.next().await.unwrap(), w);
729 /// # }
730 /// # }));
731 /// ```
732 pub fn cloned(self) -> Stream<T, L, B, Order>
733 where
734 T: Clone,
735 {
736 self.map(q!(|d| d.clone()))
737 }
738}
739
740impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
741where
742 Order: MinOrder<NoOrder, Min = NoOrder>,
743{
744 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
745 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
746 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
747 ///
748 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
749 ///
750 /// # Example
751 /// ```rust
752 /// # use hydro_lang::*;
753 /// # use futures::StreamExt;
754 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
755 /// let tick = process.tick();
756 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
757 /// let batch = unsafe { numbers.tick_batch(&tick) };
758 /// batch
759 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
760 /// .all_ticks()
761 /// # }, |mut stream| async move {
762 /// // 10
763 /// # assert_eq!(stream.next().await.unwrap(), 10);
764 /// # }));
765 /// ```
766 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
767 self,
768 init: impl IntoQuotedMut<'a, I, L>,
769 comb: impl IntoQuotedMut<'a, F, L>,
770 ) -> Singleton<A, L, B> {
771 let init = init.splice_fn0_ctx(&self.location).into();
772 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
773
774 let mut core = HydroNode::Fold {
775 init,
776 acc: comb,
777 input: Box::new(self.ir_node.into_inner()),
778 metadata: self.location.new_node_metadata::<A>(),
779 };
780
781 if L::is_top_level() {
782 // top-level (possibly unbounded) singletons are represented as
783 // a stream which produces all values from all ticks every tick,
784 // so Unpersist will always give the lastest aggregation
785 core = HydroNode::Persist {
786 inner: Box::new(core),
787 metadata: self.location.new_node_metadata::<A>(),
788 };
789 }
790
791 Singleton::new(self.location, core)
792 }
793
794 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
795 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
796 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
797 /// reference, so that it can be modified in place.
798 ///
799 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
800 ///
801 /// # Example
802 /// ```rust
803 /// # use hydro_lang::*;
804 /// # use futures::StreamExt;
805 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
806 /// let tick = process.tick();
807 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
808 /// let batch = unsafe { numbers.tick_batch(&tick) };
809 /// batch
810 /// .reduce_commutative(q!(|curr, new| *curr += new))
811 /// .all_ticks()
812 /// # }, |mut stream| async move {
813 /// // 10
814 /// # assert_eq!(stream.next().await.unwrap(), 10);
815 /// # }));
816 /// ```
817 pub fn reduce_commutative<F: Fn(&mut T, T) + 'a>(
818 self,
819 comb: impl IntoQuotedMut<'a, F, L>,
820 ) -> Optional<T, L, B> {
821 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
822 let mut core = HydroNode::Reduce {
823 f,
824 input: Box::new(self.ir_node.into_inner()),
825 metadata: self.location.new_node_metadata::<T>(),
826 };
827
828 if L::is_top_level() {
829 core = HydroNode::Persist {
830 inner: Box::new(core),
831 metadata: self.location.new_node_metadata::<T>(),
832 };
833 }
834
835 Optional::new(self.location, core)
836 }
837
838 /// Computes the maximum element in the stream as an [`Optional`], which
839 /// will be empty until the first element in the input arrives.
840 ///
841 /// # Example
842 /// ```rust
843 /// # use hydro_lang::*;
844 /// # use futures::StreamExt;
845 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
846 /// let tick = process.tick();
847 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
848 /// let batch = unsafe { numbers.tick_batch(&tick) };
849 /// batch.max().all_ticks()
850 /// # }, |mut stream| async move {
851 /// // 4
852 /// # assert_eq!(stream.next().await.unwrap(), 4);
853 /// # }));
854 /// ```
855 pub fn max(self) -> Optional<T, L, B>
856 where
857 T: Ord,
858 {
859 self.reduce_commutative(q!(|curr, new| {
860 if new > *curr {
861 *curr = new;
862 }
863 }))
864 }
865
866 /// Computes the maximum element in the stream as an [`Optional`], where the
867 /// maximum is determined according to the `key` function. The [`Optional`] will
868 /// be empty until the first element in the input arrives.
869 ///
870 /// # Example
871 /// ```rust
872 /// # use hydro_lang::*;
873 /// # use futures::StreamExt;
874 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
875 /// let tick = process.tick();
876 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
877 /// let batch = unsafe { numbers.tick_batch(&tick) };
878 /// batch.max_by_key(q!(|x| -x)).all_ticks()
879 /// # }, |mut stream| async move {
880 /// // 1
881 /// # assert_eq!(stream.next().await.unwrap(), 1);
882 /// # }));
883 /// ```
884 pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>(
885 self,
886 key: impl IntoQuotedMut<'a, F, L> + Copy,
887 ) -> Optional<T, L, B> {
888 let f = key.splice_fn1_borrow_ctx(&self.location);
889
890 let wrapped: syn::Expr = parse_quote!({
891 let key_fn = #f;
892 move |curr, new| {
893 if key_fn(&new) > key_fn(&*curr) {
894 *curr = new;
895 }
896 }
897 });
898
899 let mut core = HydroNode::Reduce {
900 f: wrapped.into(),
901 input: Box::new(self.ir_node.into_inner()),
902 metadata: self.location.new_node_metadata::<T>(),
903 };
904
905 if L::is_top_level() {
906 core = HydroNode::Persist {
907 inner: Box::new(core),
908 metadata: self.location.new_node_metadata::<T>(),
909 };
910 }
911
912 Optional::new(self.location, core)
913 }
914
915 /// Computes the minimum element in the stream as an [`Optional`], which
916 /// will be empty until the first element in the input arrives.
917 ///
918 /// # Example
919 /// ```rust
920 /// # use hydro_lang::*;
921 /// # use futures::StreamExt;
922 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
923 /// let tick = process.tick();
924 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
925 /// let batch = unsafe { numbers.tick_batch(&tick) };
926 /// batch.min().all_ticks()
927 /// # }, |mut stream| async move {
928 /// // 1
929 /// # assert_eq!(stream.next().await.unwrap(), 1);
930 /// # }));
931 /// ```
932 pub fn min(self) -> Optional<T, L, B>
933 where
934 T: Ord,
935 {
936 self.reduce_commutative(q!(|curr, new| {
937 if new < *curr {
938 *curr = new;
939 }
940 }))
941 }
942
943 /// Computes the number of elements in the stream as a [`Singleton`].
944 ///
945 /// # Example
946 /// ```rust
947 /// # use hydro_lang::*;
948 /// # use futures::StreamExt;
949 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
950 /// let tick = process.tick();
951 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
952 /// let batch = unsafe { numbers.tick_batch(&tick) };
953 /// batch.count().all_ticks()
954 /// # }, |mut stream| async move {
955 /// // 4
956 /// # assert_eq!(stream.next().await.unwrap(), 4);
957 /// # }));
958 /// ```
959 pub fn count(self) -> Singleton<usize, L, B> {
960 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
961 }
962}
963
964impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
965 /// Returns a stream with the current count tupled with each element in the input stream.
966 ///
967 /// # Example
968 /// ```rust
969 /// # use hydro_lang::*;
970 /// # use futures::StreamExt;
971 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
972 /// let tick = process.tick();
973 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
974 /// numbers.enumerate()
975 /// # }, |mut stream| async move {
976 /// // (0, 1), (1, 2), (2, 3), (3, 4)
977 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
978 /// # assert_eq!(stream.next().await.unwrap(), w);
979 /// # }
980 /// # }));
981 /// ```
982 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
983 if L::is_top_level() {
984 Stream::new(
985 self.location.clone(),
986 HydroNode::Persist {
987 inner: Box::new(HydroNode::Enumerate {
988 is_static: true,
989 input: Box::new(HydroNode::Unpersist {
990 inner: Box::new(self.ir_node.into_inner()),
991 metadata: self.location.new_node_metadata::<T>(),
992 }),
993 metadata: self.location.new_node_metadata::<(usize, T)>(),
994 }),
995 metadata: self.location.new_node_metadata::<(usize, T)>(),
996 },
997 )
998 } else {
999 Stream::new(
1000 self.location.clone(),
1001 HydroNode::Enumerate {
1002 is_static: false,
1003 input: Box::new(self.ir_node.into_inner()),
1004 metadata: self.location.new_node_metadata::<(usize, T)>(),
1005 },
1006 )
1007 }
1008 }
1009
1010 /// Computes the first element in the stream as an [`Optional`], which
1011 /// will be empty until the first element in the input arrives.
1012 ///
1013 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1014 /// re-ordering of elements may cause the first element to change.
1015 ///
1016 /// # Example
1017 /// ```rust
1018 /// # use hydro_lang::*;
1019 /// # use futures::StreamExt;
1020 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1021 /// let tick = process.tick();
1022 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1023 /// let batch = unsafe { numbers.tick_batch(&tick) };
1024 /// batch.first().all_ticks()
1025 /// # }, |mut stream| async move {
1026 /// // 1
1027 /// # assert_eq!(stream.next().await.unwrap(), 1);
1028 /// # }));
1029 /// ```
1030 pub fn first(self) -> Optional<T, L, B> {
1031 Optional::new(self.location, self.ir_node.into_inner())
1032 }
1033
1034 /// Computes the last element in the stream as an [`Optional`], which
1035 /// will be empty until an element in the input arrives.
1036 ///
1037 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1038 /// re-ordering of elements may cause the last element to change.
1039 ///
1040 /// # Example
1041 /// ```rust
1042 /// # use hydro_lang::*;
1043 /// # use futures::StreamExt;
1044 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1045 /// let tick = process.tick();
1046 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1047 /// let batch = unsafe { numbers.tick_batch(&tick) };
1048 /// batch.last().all_ticks()
1049 /// # }, |mut stream| async move {
1050 /// // 4
1051 /// # assert_eq!(stream.next().await.unwrap(), 4);
1052 /// # }));
1053 /// ```
1054 pub fn last(self) -> Optional<T, L, B> {
1055 self.reduce(q!(|curr, new| *curr = new))
1056 }
1057
1058 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1059 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1060 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1061 ///
1062 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1063 /// to depend on the order of elements in the stream.
1064 ///
1065 /// # Example
1066 /// ```rust
1067 /// # use hydro_lang::*;
1068 /// # use futures::StreamExt;
1069 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1070 /// let tick = process.tick();
1071 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1072 /// let batch = unsafe { words.tick_batch(&tick) };
1073 /// batch
1074 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1075 /// .all_ticks()
1076 /// # }, |mut stream| async move {
1077 /// // "HELLOWORLD"
1078 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1079 /// # }));
1080 /// ```
1081 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1082 self,
1083 init: impl IntoQuotedMut<'a, I, L>,
1084 comb: impl IntoQuotedMut<'a, F, L>,
1085 ) -> Singleton<A, L, B> {
1086 let init = init.splice_fn0_ctx(&self.location).into();
1087 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1088
1089 let mut core = HydroNode::Fold {
1090 init,
1091 acc: comb,
1092 input: Box::new(self.ir_node.into_inner()),
1093 metadata: self.location.new_node_metadata::<A>(),
1094 };
1095
1096 if L::is_top_level() {
1097 // top-level (possibly unbounded) singletons are represented as
1098 // a stream which produces all values from all ticks every tick,
1099 // so Unpersist will always give the lastest aggregation
1100 core = HydroNode::Persist {
1101 inner: Box::new(core),
1102 metadata: self.location.new_node_metadata::<A>(),
1103 };
1104 }
1105
1106 Singleton::new(self.location, core)
1107 }
1108
1109 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1110 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1111 /// until the first element in the input arrives.
1112 ///
1113 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1114 /// to depend on the order of elements in the stream.
1115 ///
1116 /// # Example
1117 /// ```rust
1118 /// # use hydro_lang::*;
1119 /// # use futures::StreamExt;
1120 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1121 /// let tick = process.tick();
1122 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1123 /// let batch = unsafe { words.tick_batch(&tick) };
1124 /// batch
1125 /// .map(q!(|x| x.to_string()))
1126 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1127 /// .all_ticks()
1128 /// # }, |mut stream| async move {
1129 /// // "HELLOWORLD"
1130 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1131 /// # }));
1132 /// ```
1133 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1134 self,
1135 comb: impl IntoQuotedMut<'a, F, L>,
1136 ) -> Optional<T, L, B> {
1137 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1138 let mut core = HydroNode::Reduce {
1139 f,
1140 input: Box::new(self.ir_node.into_inner()),
1141 metadata: self.location.new_node_metadata::<T>(),
1142 };
1143
1144 if L::is_top_level() {
1145 core = HydroNode::Persist {
1146 inner: Box::new(core),
1147 metadata: self.location.new_node_metadata::<T>(),
1148 };
1149 }
1150
1151 Optional::new(self.location, core)
1152 }
1153}
1154
1155impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream<T, L, Unbounded, O> {
1156 /// Produces a new stream that interleaves the elements of the two input streams.
1157 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1158 ///
1159 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1160 /// [`Bounded`], you can use [`Stream::chain`] instead.
1161 ///
1162 /// # Example
1163 /// ```rust
1164 /// # use hydro_lang::*;
1165 /// # use futures::StreamExt;
1166 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1167 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1168 /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1169 /// # }, |mut stream| async move {
1170 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1171 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1172 /// # assert_eq!(stream.next().await.unwrap(), w);
1173 /// # }
1174 /// # }));
1175 /// ```
1176 pub fn union<O2>(self, other: Stream<T, L, Unbounded, O2>) -> Stream<T, L, Unbounded, NoOrder> {
1177 let tick = self.location.tick();
1178 unsafe {
1179 // SAFETY: Because the outputs are unordered,
1180 // we can interleave batches from both streams.
1181 self.tick_batch(&tick)
1182 .assume_ordering::<NoOrder>()
1183 .chain(other.tick_batch(&tick).assume_ordering::<NoOrder>())
1184 .all_ticks()
1185 .assume_ordering()
1186 }
1187 }
1188}
1189
1190impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
1191 /// Produces a new stream that emits the input elements in sorted order.
1192 ///
1193 /// The input stream can have any ordering guarantee, but the output stream
1194 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1195 /// elements in the input stream are available, so it requires the input stream
1196 /// to be [`Bounded`].
1197 ///
1198 /// # Example
1199 /// ```rust
1200 /// # use hydro_lang::*;
1201 /// # use futures::StreamExt;
1202 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1203 /// let tick = process.tick();
1204 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1205 /// let batch = unsafe { numbers.tick_batch(&tick) };
1206 /// batch.sort().all_ticks()
1207 /// # }, |mut stream| async move {
1208 /// // 1, 2, 3, 4
1209 /// # for w in (1..5) {
1210 /// # assert_eq!(stream.next().await.unwrap(), w);
1211 /// # }
1212 /// # }));
1213 /// ```
1214 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
1215 where
1216 T: Ord,
1217 {
1218 Stream::new(
1219 self.location.clone(),
1220 HydroNode::Sort {
1221 input: Box::new(self.ir_node.into_inner()),
1222 metadata: self.location.new_node_metadata::<T>(),
1223 },
1224 )
1225 }
1226
1227 /// Produces a new stream that first emits the elements of the `self` stream,
1228 /// and then emits the elements of the `other` stream. The output stream has
1229 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1230 /// [`TotalOrder`] guarantee.
1231 ///
1232 /// Currently, both input streams must be [`Bounded`]. This operator will block
1233 /// on the first stream until all its elements are available. In a future version,
1234 /// we will relax the requirement on the `other` stream.
1235 ///
1236 /// # Example
1237 /// ```rust
1238 /// # use hydro_lang::*;
1239 /// # use futures::StreamExt;
1240 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1241 /// let tick = process.tick();
1242 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1243 /// let batch = unsafe { numbers.tick_batch(&tick) };
1244 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1245 /// # }, |mut stream| async move {
1246 /// // 2, 3, 4, 5, 1, 2, 3, 4
1247 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1248 /// # assert_eq!(stream.next().await.unwrap(), w);
1249 /// # }
1250 /// # }));
1251 /// ```
1252 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order::Min>
1253 where
1254 Order: MinOrder<O2>,
1255 {
1256 check_matching_location(&self.location, &other.location);
1257
1258 Stream::new(
1259 self.location.clone(),
1260 HydroNode::Chain {
1261 first: Box::new(self.ir_node.into_inner()),
1262 second: Box::new(other.ir_node.into_inner()),
1263 metadata: self.location.new_node_metadata::<T>(),
1264 },
1265 )
1266 }
1267}
1268
1269impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> {
1270 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1271 /// by equi-joining the two streams on the key attribute `K`.
1272 ///
1273 /// # Example
1274 /// ```rust
1275 /// # use hydro_lang::*;
1276 /// # use std::collections::HashSet;
1277 /// # use futures::StreamExt;
1278 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1279 /// let tick = process.tick();
1280 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1281 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1282 /// stream1.join(stream2)
1283 /// # }, |mut stream| async move {
1284 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1285 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1286 /// # stream.map(|i| assert!(expected.contains(&i)));
1287 /// # }));
1288 pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
1289 where
1290 K: Eq + Hash,
1291 {
1292 check_matching_location(&self.location, &n.location);
1293
1294 Stream::new(
1295 self.location.clone(),
1296 HydroNode::Join {
1297 left: Box::new(self.ir_node.into_inner()),
1298 right: Box::new(n.ir_node.into_inner()),
1299 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1300 },
1301 )
1302 }
1303
1304 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1305 /// computes the anti-join of the items in the input -- i.e. returns
1306 /// unique items in the first input that do not have a matching key
1307 /// in the second input.
1308 ///
1309 /// # Example
1310 /// ```rust
1311 /// # use hydro_lang::*;
1312 /// # use futures::StreamExt;
1313 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1314 /// let tick = process.tick();
1315 /// let stream = unsafe {
1316 /// process
1317 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1318 /// .tick_batch(&tick)
1319 /// };
1320 /// let batch = unsafe {
1321 /// process
1322 /// .source_iter(q!(vec![1, 2]))
1323 /// .tick_batch(&tick)
1324 /// };
1325 /// stream.anti_join(batch).all_ticks()
1326 /// # }, |mut stream| async move {
1327 /// # for w in vec![(3, 'c'), (4, 'd')] {
1328 /// # assert_eq!(stream.next().await.unwrap(), w);
1329 /// # }
1330 /// # }));
1331 pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, Order>
1332 where
1333 K: Eq + Hash,
1334 {
1335 check_matching_location(&self.location, &n.location);
1336
1337 Stream::new(
1338 self.location.clone(),
1339 HydroNode::AntiJoin {
1340 pos: Box::new(self.ir_node.into_inner()),
1341 neg: Box::new(n.ir_node.into_inner()),
1342 metadata: self.location.new_node_metadata::<(K, V1)>(),
1343 },
1344 )
1345 }
1346}
1347
1348impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
1349 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1350 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1351 /// in the second element are accumulated via the `comb` closure.
1352 ///
1353 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1354 /// to depend on the order of elements in the stream.
1355 ///
1356 /// If the input and output value types are the same and do not require initialization then use
1357 /// [`Stream::reduce_keyed`].
1358 ///
1359 /// # Example
1360 /// ```rust
1361 /// # use hydro_lang::*;
1362 /// # use futures::StreamExt;
1363 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1364 /// let tick = process.tick();
1365 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1366 /// let batch = unsafe { numbers.tick_batch(&tick) };
1367 /// batch
1368 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1369 /// .all_ticks()
1370 /// # }, |mut stream| async move {
1371 /// // (1, 5), (2, 7)
1372 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1373 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1374 /// # }));
1375 /// ```
1376 pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1377 self,
1378 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1379 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1380 ) -> Stream<(K, A), Tick<L>, Bounded> {
1381 let init = init.splice_fn0_ctx(&self.location).into();
1382 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1383
1384 Stream::new(
1385 self.location.clone(),
1386 HydroNode::FoldKeyed {
1387 init,
1388 acc: comb,
1389 input: Box::new(self.ir_node.into_inner()),
1390 metadata: self.location.new_node_metadata::<(K, A)>(),
1391 },
1392 )
1393 }
1394
1395 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1396 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1397 /// in the second element are accumulated via the `comb` closure.
1398 ///
1399 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1400 /// to depend on the order of elements in the stream.
1401 ///
1402 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1403 ///
1404 /// # Example
1405 /// ```rust
1406 /// # use hydro_lang::*;
1407 /// # use futures::StreamExt;
1408 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1409 /// let tick = process.tick();
1410 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1411 /// let batch = unsafe { numbers.tick_batch(&tick) };
1412 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1413 /// # }, |mut stream| async move {
1414 /// // (1, 5), (2, 7)
1415 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1416 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1417 /// # }));
1418 /// ```
1419 pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>(
1420 self,
1421 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1422 ) -> Stream<(K, V), Tick<L>, Bounded> {
1423 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1424
1425 Stream::new(
1426 self.location.clone(),
1427 HydroNode::ReduceKeyed {
1428 f,
1429 input: Box::new(self.ir_node.into_inner()),
1430 metadata: self.location.new_node_metadata::<(K, V)>(),
1431 },
1432 )
1433 }
1434}
1435
1436impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order> {
1437 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1438 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1439 /// in the second element are accumulated via the `comb` closure.
1440 ///
1441 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1442 ///
1443 /// If the input and output value types are the same and do not require initialization then use
1444 /// [`Stream::reduce_keyed_commutative`].
1445 ///
1446 /// # Example
1447 /// ```rust
1448 /// # use hydro_lang::*;
1449 /// # use futures::StreamExt;
1450 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1451 /// let tick = process.tick();
1452 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1453 /// let batch = unsafe { numbers.tick_batch(&tick) };
1454 /// batch
1455 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1456 /// .all_ticks()
1457 /// # }, |mut stream| async move {
1458 /// // (1, 5), (2, 7)
1459 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1460 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1461 /// # }));
1462 /// ```
1463 pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1464 self,
1465 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1466 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1467 ) -> Stream<(K, A), Tick<L>, Bounded, Order> {
1468 let init = init.splice_fn0_ctx(&self.location).into();
1469 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1470
1471 Stream::new(
1472 self.location.clone(),
1473 HydroNode::FoldKeyed {
1474 init,
1475 acc: comb,
1476 input: Box::new(self.ir_node.into_inner()),
1477 metadata: self.location.new_node_metadata::<(K, A)>(),
1478 },
1479 )
1480 }
1481
1482 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1483 /// # Example
1484 /// ```rust
1485 /// # use hydro_lang::*;
1486 /// # use futures::StreamExt;
1487 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1488 /// let tick = process.tick();
1489 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1490 /// let batch = unsafe { numbers.tick_batch(&tick) };
1491 /// batch.keys().all_ticks()
1492 /// # }, |mut stream| async move {
1493 /// // 1, 2
1494 /// # assert_eq!(stream.next().await.unwrap(), 1);
1495 /// # assert_eq!(stream.next().await.unwrap(), 2);
1496 /// # }));
1497 /// ```
1498 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order> {
1499 self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1500 .map(q!(|(k, _)| k))
1501 }
1502
1503 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1504 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1505 /// in the second element are accumulated via the `comb` closure.
1506 ///
1507 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1508 ///
1509 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1510 ///
1511 /// # Example
1512 /// ```rust
1513 /// # use hydro_lang::*;
1514 /// # use futures::StreamExt;
1515 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1516 /// let tick = process.tick();
1517 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1518 /// let batch = unsafe { numbers.tick_batch(&tick) };
1519 /// batch
1520 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1521 /// .all_ticks()
1522 /// # }, |mut stream| async move {
1523 /// // (1, 5), (2, 7)
1524 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1525 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1526 /// # }));
1527 /// ```
1528 pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>(
1529 self,
1530 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1531 ) -> Stream<(K, V), Tick<L>, Bounded, Order> {
1532 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1533
1534 Stream::new(
1535 self.location.clone(),
1536 HydroNode::ReduceKeyed {
1537 f,
1538 input: Box::new(self.ir_node.into_inner()),
1539 metadata: self.location.new_node_metadata::<(K, V)>(),
1540 },
1541 )
1542 }
1543}
1544
1545impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Atomic<L>, B, Order> {
1546 /// Returns a stream corresponding to the latest batch of elements being atomically
1547 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1548 /// the order of the input.
1549 ///
1550 /// # Safety
1551 /// The batch boundaries are non-deterministic and may change across executions.
1552 pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order> {
1553 Stream::new(
1554 self.location.clone().tick,
1555 HydroNode::Unpersist {
1556 inner: Box::new(self.ir_node.into_inner()),
1557 metadata: self.location.new_node_metadata::<T>(),
1558 },
1559 )
1560 }
1561
1562 pub fn end_atomic(self) -> Stream<T, L, B, Order> {
1563 Stream::new(self.location.tick.l, self.ir_node.into_inner())
1564 }
1565
1566 pub fn atomic_source(&self) -> Tick<L> {
1567 self.location.tick.clone()
1568 }
1569}
1570
1571impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B, Order> Stream<T, L, B, Order> {
1572 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, Order> {
1573 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1574 }
1575
1576 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1577 /// Future outputs are produced as available, regardless of input arrival order.
1578 ///
1579 /// # Example
1580 /// ```rust
1581 /// # use std::collections::HashSet;
1582 /// # use futures::StreamExt;
1583 /// # use hydro_lang::*;
1584 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1585 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1586 /// .map(q!(|x| async move {
1587 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1588 /// x
1589 /// }))
1590 /// .resolve_futures()
1591 /// # },
1592 /// # |mut stream| async move {
1593 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
1594 /// # let mut output = HashSet::new();
1595 /// # for _ in 1..10 {
1596 /// # output.insert(stream.next().await.unwrap());
1597 /// # }
1598 /// # assert_eq!(
1599 /// # output,
1600 /// # HashSet::<i32>::from_iter(1..10)
1601 /// # );
1602 /// # },
1603 /// # ));
1604 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder>
1605 where
1606 T: Future<Output = T2>,
1607 {
1608 Stream::new(
1609 self.location.clone(),
1610 HydroNode::ResolveFutures {
1611 input: Box::new(self.ir_node.into_inner()),
1612 metadata: self.location.new_node_metadata::<T2>(),
1613 },
1614 )
1615 }
1616
1617 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1618 /// Future outputs are produced in the same order as the input stream.
1619 ///
1620 /// # Example
1621 /// ```rust
1622 /// # use std::collections::HashSet;
1623 /// # use futures::StreamExt;
1624 /// # use hydro_lang::*;
1625 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1626 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1627 /// .map(q!(|x| async move {
1628 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1629 /// x
1630 /// }))
1631 /// .resolve_futures_ordered()
1632 /// # },
1633 /// # |mut stream| async move {
1634 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
1635 /// # let mut output = Vec::new();
1636 /// # for _ in 1..10 {
1637 /// # output.push(stream.next().await.unwrap());
1638 /// # }
1639 /// # assert_eq!(
1640 /// # output,
1641 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
1642 /// # );
1643 /// # },
1644 /// # ));
1645 pub fn resolve_futures_ordered<T2>(self) -> Stream<T2, L, B, Order>
1646 where
1647 T: Future<Output = T2>,
1648 {
1649 Stream::new(
1650 self.location.clone(),
1651 HydroNode::ResolveFuturesOrdered {
1652 input: Box::new(self.ir_node.into_inner()),
1653 metadata: self.location.new_node_metadata::<T2>(),
1654 },
1655 )
1656 }
1657
1658 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1659 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1660 /// the order of the input.
1661 ///
1662 /// # Safety
1663 /// The batch boundaries are non-deterministic and may change across executions.
1664 pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, Order> {
1665 unsafe { self.atomic(tick).tick_batch() }
1666 }
1667
1668 /// Given a time interval, returns a stream corresponding to samples taken from the
1669 /// stream roughly at that interval. The output will have elements in the same order
1670 /// as the input, but with arbitrary elements skipped between samples. There is also
1671 /// no guarantee on the exact timing of the samples.
1672 ///
1673 /// # Safety
1674 /// The output stream is non-deterministic in which elements are sampled, since this
1675 /// is controlled by a clock.
1676 pub unsafe fn sample_every(
1677 self,
1678 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1679 ) -> Stream<T, L, Unbounded, Order> {
1680 let samples = unsafe {
1681 // SAFETY: source of intentional non-determinism
1682 self.location.source_interval(interval)
1683 };
1684
1685 let tick = self.location.tick();
1686 unsafe {
1687 // SAFETY: source of intentional non-determinism
1688 self.tick_batch(&tick)
1689 .continue_if(samples.tick_batch(&tick).first())
1690 .all_ticks()
1691 }
1692 }
1693
1694 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1695 /// stream has not emitted a value since that duration.
1696 ///
1697 /// # Safety
1698 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1699 /// samples take place, timeouts may be non-deterministically generated or missed,
1700 /// and the notification of the timeout may be delayed as well. There is also no
1701 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1702 /// detected based on when the next sample is taken.
1703 pub unsafe fn timeout(
1704 self,
1705 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1706 ) -> Optional<(), L, Unbounded>
1707 where
1708 Order: MinOrder<NoOrder, Min = NoOrder>,
1709 {
1710 let tick = self.location.tick();
1711
1712 let latest_received = self.fold_commutative(
1713 q!(|| None),
1714 q!(|latest, _| {
1715 *latest = Some(Instant::now());
1716 }),
1717 );
1718
1719 unsafe {
1720 // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1721 latest_received.latest_tick(&tick)
1722 }
1723 .filter_map(q!(move |latest_received| {
1724 if let Some(latest_received) = latest_received {
1725 if Instant::now().duration_since(latest_received) > duration {
1726 Some(())
1727 } else {
1728 None
1729 }
1730 } else {
1731 Some(())
1732 }
1733 }))
1734 .latest()
1735 }
1736}
1737
1738impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1739 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1740 let f = f.splice_fn1_ctx(&self.location).into();
1741 let metadata = self.location.new_node_metadata::<T>();
1742 self.location
1743 .flow_state()
1744 .borrow_mut()
1745 .leaves
1746 .as_mut()
1747 .expect(FLOW_USED_MESSAGE)
1748 .push(HydroLeaf::ForEach {
1749 input: Box::new(HydroNode::Unpersist {
1750 inner: Box::new(self.ir_node.into_inner()),
1751 metadata: metadata.clone(),
1752 }),
1753 f,
1754 metadata,
1755 });
1756 }
1757
1758 pub fn dest_sink<S: Unpin + futures::Sink<T> + 'a>(
1759 self,
1760 sink: impl QuotedWithContext<'a, S, L>,
1761 ) {
1762 self.location
1763 .flow_state()
1764 .borrow_mut()
1765 .leaves
1766 .as_mut()
1767 .expect(FLOW_USED_MESSAGE)
1768 .push(HydroLeaf::DestSink {
1769 sink: sink.splice_typed_ctx(&self.location).into(),
1770 input: Box::new(self.ir_node.into_inner()),
1771 metadata: self.location.new_node_metadata::<T>(),
1772 });
1773 }
1774}
1775
1776impl<'a, T, L: Location<'a>, Order> Stream<T, Tick<L>, Bounded, Order> {
1777 pub fn all_ticks(self) -> Stream<T, L, Unbounded, Order> {
1778 Stream::new(
1779 self.location.outer().clone(),
1780 HydroNode::Persist {
1781 inner: Box::new(self.ir_node.into_inner()),
1782 metadata: self.location.new_node_metadata::<T>(),
1783 },
1784 )
1785 }
1786
1787 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, Order> {
1788 Stream::new(
1789 Atomic {
1790 tick: self.location.clone(),
1791 },
1792 HydroNode::Persist {
1793 inner: Box::new(self.ir_node.into_inner()),
1794 metadata: self.location.new_node_metadata::<T>(),
1795 },
1796 )
1797 }
1798
1799 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, Order>
1800 where
1801 T: Clone,
1802 {
1803 Stream::new(
1804 self.location.clone(),
1805 HydroNode::Persist {
1806 inner: Box::new(self.ir_node.into_inner()),
1807 metadata: self.location.new_node_metadata::<T>(),
1808 },
1809 )
1810 }
1811
1812 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, Order> {
1813 Stream::new(
1814 self.location.clone(),
1815 HydroNode::DeferTick {
1816 input: Box::new(self.ir_node.into_inner()),
1817 metadata: self.location.new_node_metadata::<T>(),
1818 },
1819 )
1820 }
1821
1822 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, Order> {
1823 Stream::new(
1824 self.location.clone(),
1825 HydroNode::Delta {
1826 inner: Box::new(self.ir_node.into_inner()),
1827 metadata: self.location.new_node_metadata::<T>(),
1828 },
1829 )
1830 }
1831}
1832
1833pub fn serialize_bincode_with_type(is_demux: bool, t_type: syn::Type) -> syn::Expr {
1834 let root = get_this_crate();
1835
1836 if is_demux {
1837 parse_quote! {
1838 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
1839 |(id, data)| {
1840 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
1841 }
1842 )
1843 }
1844 } else {
1845 parse_quote! {
1846 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
1847 |data| {
1848 #root::runtime_support::bincode::serialize(&data).unwrap().into()
1849 }
1850 )
1851 }
1852 }
1853}
1854
1855fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
1856 serialize_bincode_with_type(is_demux, stageleft::quote_type::<T>())
1857}
1858
1859pub fn deserialize_bincode_with_type(tagged: Option<syn::Type>, t_type: syn::Type) -> syn::Expr {
1860 let root = get_this_crate();
1861
1862 if let Some(c_type) = tagged {
1863 parse_quote! {
1864 |res| {
1865 let (id, b) = res.unwrap();
1866 (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
1867 }
1868 }
1869 } else {
1870 parse_quote! {
1871 |res| {
1872 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
1873 }
1874 }
1875 }
1876}
1877
1878pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<syn::Type>) -> syn::Expr {
1879 deserialize_bincode_with_type(tagged, stageleft::quote_type::<T>())
1880}
1881
1882impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1883 pub fn send_bincode<L2: Location<'a>, CoreType>(
1884 self,
1885 other: &L2,
1886 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, Order::Min>
1887 where
1888 L::Root: CanSend<'a, L2, In<CoreType> = T>,
1889 CoreType: Serialize + DeserializeOwned,
1890 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1891 {
1892 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
1893
1894 let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(L::Root::tagged_type()));
1895
1896 Stream::new(
1897 other.clone(),
1898 HydroNode::Network {
1899 from_key: None,
1900 to_location: other.id(),
1901 to_key: None,
1902 serialize_fn: serialize_pipeline.map(|e| e.into()),
1903 instantiate_fn: DebugInstantiate::Building,
1904 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1905 input: Box::new(self.ir_node.into_inner()),
1906 metadata: other.new_node_metadata::<CoreType>(),
1907 },
1908 )
1909 }
1910
1911 pub fn send_bincode_external<L2: 'a, CoreType>(
1912 self,
1913 other: &ExternalProcess<L2>,
1914 ) -> ExternalBincodeStream<L::Out<CoreType>>
1915 where
1916 L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
1917 CoreType: Serialize + DeserializeOwned,
1918 // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
1919 {
1920 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
1921
1922 let metadata = other.new_node_metadata::<CoreType>();
1923
1924 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1925
1926 let external_key = flow_state_borrow.next_external_out;
1927 flow_state_borrow.next_external_out += 1;
1928
1929 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1930
1931 let dummy_f: syn::Expr = syn::parse_quote!(());
1932
1933 leaves.push(HydroLeaf::ForEach {
1934 f: dummy_f.into(),
1935 input: Box::new(HydroNode::Network {
1936 from_key: None,
1937 to_location: other.id(),
1938 to_key: Some(external_key),
1939 serialize_fn: serialize_pipeline.map(|e| e.into()),
1940 instantiate_fn: DebugInstantiate::Building,
1941 deserialize_fn: None,
1942 input: Box::new(self.ir_node.into_inner()),
1943 metadata: metadata.clone(),
1944 }),
1945 metadata,
1946 });
1947
1948 ExternalBincodeStream {
1949 process_id: other.id,
1950 port_id: external_key,
1951 _phantom: PhantomData,
1952 }
1953 }
1954
1955 pub fn send_bytes<L2: Location<'a>>(
1956 self,
1957 other: &L2,
1958 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, Order::Min>
1959 where
1960 L::Root: CanSend<'a, L2, In<Bytes> = T>,
1961 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1962 {
1963 let root = get_this_crate();
1964 Stream::new(
1965 other.clone(),
1966 HydroNode::Network {
1967 from_key: None,
1968 to_location: other.id(),
1969 to_key: None,
1970 serialize_fn: None,
1971 instantiate_fn: DebugInstantiate::Building,
1972 deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
1973 let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
1974 Some(expr.into())
1975 } else {
1976 let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
1977 Some(expr.into())
1978 },
1979 input: Box::new(self.ir_node.into_inner()),
1980 metadata: other.new_node_metadata::<Bytes>(),
1981 },
1982 )
1983 }
1984
1985 pub fn send_bytes_external<L2: 'a>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
1986 where
1987 L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
1988 {
1989 let metadata = other.new_node_metadata::<Bytes>();
1990
1991 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1992 let external_key = flow_state_borrow.next_external_out;
1993 flow_state_borrow.next_external_out += 1;
1994
1995 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1996
1997 let dummy_f: syn::Expr = syn::parse_quote!(());
1998
1999 leaves.push(HydroLeaf::ForEach {
2000 f: dummy_f.into(),
2001 input: Box::new(HydroNode::Network {
2002 from_key: None,
2003 to_location: other.id(),
2004 to_key: Some(external_key),
2005 serialize_fn: None,
2006 instantiate_fn: DebugInstantiate::Building,
2007 deserialize_fn: None,
2008 input: Box::new(self.ir_node.into_inner()),
2009 metadata: metadata.clone(),
2010 }),
2011 metadata,
2012 });
2013
2014 ExternalBytesPort {
2015 process_id: other.id,
2016 port_id: external_key,
2017 }
2018 }
2019
2020 pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
2021 self,
2022 other: &L2,
2023 ) -> Stream<CoreType, L2, Unbounded, Order::Min>
2024 where
2025 L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
2026 CoreType: Serialize + DeserializeOwned,
2027 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
2028 {
2029 self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
2030 }
2031
2032 pub fn send_bytes_anonymous<L2: Location<'a>, Tag>(
2033 self,
2034 other: &L2,
2035 ) -> Stream<Bytes, L2, Unbounded, Order::Min>
2036 where
2037 L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
2038 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
2039 {
2040 self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
2041 }
2042
2043 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2044 pub fn broadcast_bincode<C2: 'a>(
2045 self,
2046 other: &Cluster<'a, C2>,
2047 ) -> Stream<
2048 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2049 Cluster<'a, C2>,
2050 Unbounded,
2051 Order::Min,
2052 >
2053 where
2054 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2055 T: Clone + Serialize + DeserializeOwned,
2056 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2057 {
2058 let ids = other.members();
2059
2060 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2061 ::std::clone::Clone::clone(id),
2062 ::std::clone::Clone::clone(&b)
2063 ))))
2064 .send_bincode(other)
2065 }
2066
2067 pub fn broadcast_bincode_anonymous<C2: 'a, Tag>(
2068 self,
2069 other: &Cluster<'a, C2>,
2070 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order::Min>
2071 where
2072 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2073 T: Clone + Serialize + DeserializeOwned,
2074 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2075 {
2076 self.broadcast_bincode(other).map(q!(|(_, b)| b))
2077 }
2078
2079 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2080 pub fn broadcast_bytes<C2: 'a>(
2081 self,
2082 other: &Cluster<'a, C2>,
2083 ) -> Stream<
2084 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2085 Cluster<'a, C2>,
2086 Unbounded,
2087 Order::Min,
2088 >
2089 where
2090 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2091 T: Clone,
2092 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2093 {
2094 let ids = other.members();
2095
2096 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2097 ::std::clone::Clone::clone(id),
2098 ::std::clone::Clone::clone(&b)
2099 ))))
2100 .send_bytes(other)
2101 }
2102
2103 pub fn broadcast_bytes_anonymous<C2: 'a, Tag>(
2104 self,
2105 other: &Cluster<'a, C2>,
2106 ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, Order::Min>
2107 where
2108 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2109 + 'a,
2110 T: Clone,
2111 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2112 {
2113 self.broadcast_bytes(other).map(q!(|(_, b)| b))
2114 }
2115}
2116
2117#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2118impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder> {
2119 pub fn round_robin_bincode<C2: 'a>(
2120 self,
2121 other: &Cluster<'a, C2>,
2122 ) -> Stream<
2123 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2124 Cluster<'a, C2>,
2125 Unbounded,
2126 <TotalOrder as MinOrder<
2127 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2128 >>::Min,
2129 >
2130 where
2131 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2132 T: Clone + Serialize + DeserializeOwned,
2133 TotalOrder:
2134 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2135 {
2136 let ids = other.members();
2137
2138 self.enumerate()
2139 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2140 .send_bincode(other)
2141 }
2142
2143 pub fn round_robin_bincode_anonymous<C2: 'a, Tag>(
2144 self,
2145 other: &Cluster<'a, C2>,
2146 ) -> Stream<
2147 T,
2148 Cluster<'a, C2>,
2149 Unbounded,
2150 <TotalOrder as MinOrder<
2151 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2152 >>::Min,
2153 >
2154 where
2155 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2156 T: Clone + Serialize + DeserializeOwned,
2157 TotalOrder:
2158 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2159 {
2160 self.round_robin_bincode(other).map(q!(|(_, b)| b))
2161 }
2162
2163 pub fn round_robin_bytes<C2: 'a>(
2164 self,
2165 other: &Cluster<'a, C2>,
2166 ) -> Stream<
2167 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2168 Cluster<'a, C2>,
2169 Unbounded,
2170 <TotalOrder as MinOrder<
2171 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2172 >>::Min,
2173 >
2174 where
2175 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2176 T: Clone,
2177 TotalOrder:
2178 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2179 {
2180 let ids = other.members();
2181
2182 self.enumerate()
2183 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2184 .send_bytes(other)
2185 }
2186
2187 pub fn round_robin_bytes_anonymous<C2: 'a, Tag>(
2188 self,
2189 other: &Cluster<'a, C2>,
2190 ) -> Stream<
2191 Bytes,
2192 Cluster<'a, C2>,
2193 Unbounded,
2194 <TotalOrder as MinOrder<
2195 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2196 >>::Min,
2197 >
2198 where
2199 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2200 + 'a,
2201 T: Clone,
2202 TotalOrder:
2203 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2204 {
2205 self.round_robin_bytes(other).map(q!(|(_, b)| b))
2206 }
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211 use futures::StreamExt;
2212 use hydro_deploy::Deployment;
2213 use serde::{Deserialize, Serialize};
2214 use stageleft::q;
2215
2216 use crate::FlowBuilder;
2217 use crate::location::Location;
2218
2219 struct P1 {}
2220 struct P2 {}
2221
2222 #[derive(Serialize, Deserialize, Debug)]
2223 struct SendOverNetwork {
2224 n: u32,
2225 }
2226
2227 #[tokio::test]
2228 async fn first_ten_distributed() {
2229 let mut deployment = Deployment::new();
2230
2231 let flow = FlowBuilder::new();
2232 let first_node = flow.process::<P1>();
2233 let second_node = flow.process::<P2>();
2234 let external = flow.external_process::<P2>();
2235
2236 let numbers = first_node.source_iter(q!(0..10));
2237 let out_port = numbers
2238 .map(q!(|n| SendOverNetwork { n }))
2239 .send_bincode(&second_node)
2240 .send_bincode_external(&external);
2241
2242 let nodes = flow
2243 .with_process(&first_node, deployment.Localhost())
2244 .with_process(&second_node, deployment.Localhost())
2245 .with_external(&external, deployment.Localhost())
2246 .deploy(&mut deployment);
2247
2248 deployment.deploy().await.unwrap();
2249
2250 let mut external_out = nodes.connect_source_bincode(out_port).await;
2251
2252 deployment.start().await.unwrap();
2253
2254 for i in 0..10 {
2255 assert_eq!(external_out.next().await.unwrap().n, i);
2256 }
2257 }
2258}