Skip to content

Commit

Permalink
Adding in SignalExt::stop_if method
Browse files Browse the repository at this point in the history
  • Loading branch information
Pauan committed Sep 7, 2023
1 parent 0b44811 commit a1738f7
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 0 deletions.
76 changes: 76 additions & 0 deletions src/signal/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,41 @@ pub trait SignalExt: Signal {
}
}

/// Conditionally stops the `Signal`.
///
/// For each value in `self` it will call the `test` function.
///
/// If `test` returns `true` then the `Signal` will stop emitting
/// any future values.
///
/// The value which is passed to `test` is always emitted no matter
/// what.
///
/// # Examples
///
/// ```rust
/// # use futures_signals::signal::{always, SignalExt};
/// # let input = always(1);
/// // Stops the signal when x is above 5
/// let output = input.stop_if(|x| *x > 5);
/// ```
///
/// # Performance
///
/// This is ***extremely*** efficient: it is *guaranteed* constant time, and it does not do
/// any heap allocation.
#[inline]
fn stop_if<F>(self, test: F) -> StopIf<Self, F>
where F: FnMut(&Self::Item) -> bool,
Self: Sized {
StopIf {
signal: self,
stopped: false,
test,
}
}


#[inline]
#[track_caller]
#[cfg(feature = "debug")]
Expand Down Expand Up @@ -1057,6 +1092,47 @@ impl<A, B, C> Signal for Map<A, B>
}


#[pin_project(project = StopIfProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
pub struct StopIf<A, B> {
#[pin]
signal: A,
stopped: bool,
test: B,
}

impl<A, B> Signal for StopIf<A, B>
where A: Signal,
B: FnMut(&A::Item) -> bool {
type Item = A::Item;

fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let StopIfProj { signal, stopped, test } = self.project();

if *stopped {
Poll::Ready(None)

} else {
match signal.poll_change(cx) {
Poll::Ready(Some(value)) => {
if test(&value) {
*stopped = true;
}

Poll::Ready(Some(value))
},
Poll::Ready(None) => {
*stopped = true;
Poll::Ready(None)
},
Poll::Pending => Poll::Pending,
}
}
}
}


#[pin_project(project = EqProj)]
#[derive(Debug)]
#[must_use = "Signals do nothing unless polled"]
Expand Down
1 change: 1 addition & 0 deletions tests/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mod neq;
mod option;
mod result;
mod sample_stream_cloned;
mod stop_if;
mod switch_signal_vec;
mod throttle;
36 changes: 36 additions & 0 deletions tests/signal/stop_if.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::task::Poll;
use futures_signals::signal::SignalExt;
use crate::util;


#[test]
fn test_stop_if() {
let input = util::Source::new(vec![
Poll::Ready(0),
Poll::Pending,
Poll::Pending,
Poll::Ready(1),
Poll::Ready(5),
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Pending,
Poll::Ready(0),
Poll::Pending,
Poll::Ready(3),
Poll::Pending,
]);

let output = input.stop_if(move |x| *x == 5);

util::assert_signal_eq(output, vec![
Poll::Ready(Some(0)),
Poll::Pending,
Poll::Pending,
Poll::Ready(Some(1)),
Poll::Ready(Some(5)),
Poll::Ready(None),
]);
}

0 comments on commit a1738f7

Please sign in to comment.