pub struct Receiver<T> { /* private fields */ }
Expand description
Receiving-half of the broadcast
channel.
Must not be used concurrently. Messages may be retrieved using
recv
.
Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
Implementations
sourceimpl<T> Receiver<T> where
T: Clone,
impl<T> Receiver<T> where
T: Clone,
sourcepub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
Attempts to return a pending value on this receiver without awaiting.
This is useful for a flavor of “optimistic check” before deciding to await on a receiver.
Compared with recv
, this function has three failure cases instead of one
(one for closed, one for an empty buffer, one for a lagging receiver).
Err(TryRecvError::Closed)
is returned when all Sender
halves have
dropped, indicating that no further values can be sent on the channel.
If the Receiver
handle falls behind, once the channel is full, newly
sent values will overwrite old values. At this point, a call to recv
will return with Err(TryRecvError::Lagged)
and the Receiver
’s
internal cursor is updated to point to the oldest value still held by
the channel. A subsequent call to try_recv
will return this value
unless it has been since overwritten. If there are no values to
receive, Err(TryRecvError::Empty)
is returned.
Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(16);
assert!(rx.try_recv().is_err());
tx.send(10).unwrap();
let value = rx.try_recv().unwrap();
assert_eq!(10, value);
}
sourcepub async fn recv(&mut self) -> Result<T, RecvError>
pub async fn recv(&mut self) -> Result<T, RecvError>
Receives the next value for this receiver.
Each Receiver
handle will receive a clone of all values sent
after it has subscribed.
Err(RecvError::Closed)
is returned when all Sender
halves have
dropped, indicating that no further values can be sent on the channel.
If the Receiver
handle falls behind, once the channel is full, newly
sent values will overwrite old values. At this point, a call to recv
will return with Err(RecvError::Lagged)
and the Receiver
’s
internal cursor is updated to point to the oldest value still held by
the channel. A subsequent call to recv
will return this value
unless it has been since overwritten.
Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
Handling lag
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// The receiver lagged behind
assert!(rx.recv().await.is_err());
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}
sourceimpl<T: Clone> Receiver<T>
impl<T: Clone> Receiver<T>
sourcepub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>>
pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>>
Convert the receiver into a Stream
.
The conversion allows using Receiver
with APIs that require stream
values.
Examples
use tokio::stream::StreamExt;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, rx) = broadcast::channel(128);
tokio::spawn(async move {
for i in 0..10_i32 {
tx.send(i).unwrap();
}
});
// Streams must be pinned to iterate.
tokio::pin! {
let stream = rx
.into_stream()
.filter(Result::is_ok)
.map(Result::unwrap)
.filter(|v| v % 2 == 0)
.map(|v| v + 1);
}
while let Some(i) = stream.next().await {
println!("{}", i);
}
}
Trait Implementations
impl<T: Send> Send for Receiver<T>
impl<T: Send> Sync for Receiver<T>
Auto Trait Implementations
impl<T> !RefUnwindSafe for Receiver<T>
impl<T> Unpin for Receiver<T>
impl<T> !UnwindSafe for Receiver<T>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcepub fn borrow_mut(&mut self) -> &mut T
pub fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<St> StreamExt for St where
St: Stream + ?Sized,
impl<St> StreamExt for St where
St: Stream + ?Sized,
sourcefn next(&mut self) -> Next<'_, Self> where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self> where
Self: Unpin,
Consumes and returns the next value in the stream or None
if the
stream is finished. Read more
sourcefn try_next<T, E>(&mut self) -> TryNext<'_, Self> where
Self: Stream<Item = Result<T, E>> + Unpin,
fn try_next<T, E>(&mut self) -> TryNext<'_, Self> where
Self: Stream<Item = Result<T, E>> + Unpin,
Consumes and returns the next item in the stream. If an error is encountered before the next item, the error is returned instead. Read more
sourcefn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T,
Self: Sized,
fn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T,
Self: Sized,
Maps this stream’s items to a different type, returning a new stream of the resulting type. Read more
sourcefn merge<U>(self, other: U) -> Merge<Self, U> where
U: Stream<Item = Self::Item>,
Self: Sized,
fn merge<U>(self, other: U) -> Merge<Self, U> where
U: Stream<Item = Self::Item>,
Self: Sized,
Combine two streams into one by interleaving the output of both as it is produced. Read more
sourcefn filter<F>(self, f: F) -> Filter<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
fn filter<F>(self, f: F) -> Filter<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
Filters the values produced by this stream according to the provided predicate. Read more
sourcefn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
Filters the values produced by this stream while simultaneously mapping them to a different type according to the provided closure. Read more
sourcefn fuse(self) -> Fuse<Self> where
Self: Sized,
fn fuse(self) -> Fuse<Self> where
Self: Sized,
Creates a stream which ends after the first None
. Read more
sourcefn take(self, n: usize) -> Take<Self> where
Self: Sized,
fn take(self, n: usize) -> Take<Self> where
Self: Sized,
Creates a new stream of at most n
items of the underlying stream. Read more
sourcefn take_while<F>(self, f: F) -> TakeWhile<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
fn take_while<F>(self, f: F) -> TakeWhile<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
Take elements from this stream while the provided predicate
resolves to true
. Read more
sourcefn skip(self, n: usize) -> Skip<Self> where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self> where
Self: Sized,
Creates a new stream that will skip the n
first items of the
underlying stream. Read more
sourcefn skip_while<F>(self, f: F) -> SkipWhile<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
Skip elements from the underlying stream while the provided predicate
resolves to true
. Read more
sourcefn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
Tests if every element of the stream matches a predicate. Read more
sourcefn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
Tests if any element of the stream matches a predicate. Read more
sourcefn chain<U>(self, other: U) -> Chain<Self, U> where
U: Stream<Item = Self::Item>,
Self: Sized,
fn chain<U>(self, other: U) -> Chain<Self, U> where
U: Stream<Item = Self::Item>,
Self: Sized,
Combine two streams into one by first returning all values from the first stream then all values from the second stream. Read more
sourcefn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
A combinator that applies a function to every element in a stream producing a single, final value. Read more