Expand description
Synchronization primitives for use in asynchronous contexts.
Tokio programs tend to be organized as a set of tasks where each task operates independently and may be executed on separate physical threads. The synchronization primitives provided in this module permit these independent tasks to communicate together.
Message passing
The most common form of synchronization in a Tokio program is message passing. Two tasks operate independently and send messages to each other to synchronize. Doing so has the advantage of avoiding shared state.
Message passing is implemented using channels. A channel supports sending a message from one producer task to one or more consumer tasks. There are a few flavors of channels provided by Tokio. Each channel flavor supports different message passing patterns. When a channel supports multiple producers, many separate tasks may send messages. When a channel supports multiple consumers, many different separate tasks may receive messages.
Tokio provides many different channel flavors as different message passing patterns are best handled with different implementations.
oneshot
channel
The oneshot
channel supports sending a single value from a
single producer to a single consumer. This channel is usually used to send
the result of a computation to a waiter.
Example: using a oneshot
channel to receive the result of a
computation.
use tokio::sync::oneshot;
async fn some_computation() -> String {
"represents the result of the computation".to_string()
}
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let res = some_computation().await;
tx.send(res).unwrap();
});
// Do other work while the computation is happening in the background
// Wait for the computation result
let res = rx.await.unwrap();
}
Note, if the task produces a computation result as its final
action before terminating, the JoinHandle
can be used to
receive that value instead of allocating resources for the
oneshot
channel. Awaiting on JoinHandle
returns Result
. If
the task panics, the Joinhandle
yields Err
with the panic
cause.
Example:
async fn some_computation() -> String {
"the result of the computation".to_string()
}
#[tokio::main]
async fn main() {
let join_handle = tokio::spawn(async move {
some_computation().await
});
// Do other work while the computation is happening in the background
// Wait for the computation result
let res = join_handle.await.unwrap();
}
mpsc
channel
The mpsc
channel supports sending many values from many
producers to a single consumer. This channel is often used to send work to a
task or to receive the result of many computations.
This is also the channel you should use if you want to send many messages from a single producer to a single consumer. There is no dedicated spsc channel.
Example: using an mpsc to incrementally stream the results of a series of computations.
use tokio::sync::mpsc;
async fn some_computation(input: u32) -> String {
format!("the result of computation {}", input)
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
for i in 0..10 {
let res = some_computation(i).await;
tx.send(res).await.unwrap();
}
});
while let Some(res) = rx.recv().await {
println!("got = {}", res);
}
}
The argument to mpsc::channel
is the channel capacity. This is the maximum
number of values that can be stored in the channel pending receipt at any
given time. Properly setting this value is key in implementing robust
programs as the channel capacity plays a critical part in handling back
pressure.
A common concurrency pattern for resource management is to spawn a task dedicated to managing that resource and using message passing between other tasks to interact with the resource. The resource may be anything that may not be concurrently used. Some examples include a socket and program state. For example, if multiple tasks need to send data over a single socket, spawn a task to manage the socket and use a channel to synchronize.
Example: sending data from many tasks over a single socket using message passing.
use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("www.example.com:1234").await?;
let (tx, mut rx) = mpsc::channel(100);
for _ in 0..10 {
// Each task needs its own `tx` handle. This is done by cloning the
// original handle.
let tx = tx.clone();
tokio::spawn(async move {
tx.send(&b"data to write"[..]).await.unwrap();
});
}
// The `rx` half of the channel returns `None` once **all** `tx` clones
// drop. To ensure `None` is returned, drop the handle owned by the
// current task. If this `tx` handle is not dropped, there will always
// be a single outstanding `tx` handle.
drop(tx);
while let Some(res) = rx.recv().await {
socket.write_all(res).await?;
}
Ok(())
}
The mpsc
and oneshot
channels can be combined to
provide a request / response type synchronization pattern with a shared
resource. A task is spawned to synchronize a resource and waits on commands
received on a mpsc
channel. Each command includes a
oneshot
Sender
on which the result of the command is sent.
Example: use a task to synchronize a u64
counter. Each task sends an
“fetch and increment” command. The counter value before the increment is
sent over the provided oneshot
channel.
use tokio::sync::{oneshot, mpsc};
use Command::Increment;
enum Command {
Increment,
// Other commands can be added here
}
#[tokio::main]
async fn main() {
let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
// Spawn a task to manage the counter
tokio::spawn(async move {
let mut counter: u64 = 0;
while let Some((cmd, response)) = cmd_rx.recv().await {
match cmd {
Increment => {
let prev = counter;
counter += 1;
response.send(prev).unwrap();
}
}
}
});
let mut join_handles = vec![];
// Spawn tasks that will send the increment command.
for _ in 0..10 {
let cmd_tx = cmd_tx.clone();
join_handles.push(tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
let res = resp_rx.await.unwrap();
println!("previous value = {}", res);
}));
}
// Wait for all tasks to complete
for join_handle in join_handles.drain(..) {
join_handle.await.unwrap();
}
}
broadcast
channel
The broadcast
channel supports sending many values from
many producers to many consumers. Each consumer will receive
each value. This channel can be used to implement “fan out” style
patterns common with pub / sub or “chat” systems.
This channel tends to be used less often than oneshot
and mpsc
but still
has its use cases.
This is also the channel you should use if you want to broadcast values from a single producer to many consumers. There is no dedicated spmc broadcast channel.
Basic usage
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
watch
channel
The watch
channel supports sending many values from a single
producer to many consumers. However, only the most recent value is
stored in the channel. Consumers are notified when a new value is sent, but
there is no guarantee that consumers will see all values.
The watch
channel is similar to a broadcast
channel with capacity 1.
Use cases for the watch
channel include broadcasting configuration
changes or signalling program state changes, such as transitioning to
shutdown.
Example: use a watch
channel to notify tasks of configuration
changes. In this example, a configuration file is checked periodically. When
the file changes, the configuration changes are signalled to consumers.
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};
use std::io;
#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
timeout: Duration,
}
impl Config {
async fn load_from_file() -> io::Result<Config> {
// file loading and deserialization logic here
}
}
async fn my_async_operation() {
// Do something here
}
#[tokio::main]
async fn main() {
// Load initial configuration value
let mut config = Config::load_from_file().await.unwrap();
// Create the watch channel, initialized with the loaded configuration
let (tx, rx) = watch::channel(config.clone());
// Spawn a task to monitor the file.
tokio::spawn(async move {
loop {
// Wait 10 seconds between checks
time::sleep(Duration::from_secs(10)).await;
// Load the configuration file
let new_config = Config::load_from_file().await.unwrap();
// If the configuration changed, send the new config value
// on the watch channel.
if new_config != config {
tx.send(new_config.clone()).unwrap();
config = new_config;
}
}
});
let mut handles = vec![];
// Spawn tasks that runs the async operation for at most `timeout`. If
// the timeout elapses, restart the operation.
//
// The task simultaneously watches the `Config` for changes. When the
// timeout duration changes, the timeout is updated without restarting
// the in-flight operation.
for _ in 0..5 {
// Clone a config watch handle for use in this task
let mut rx = rx.clone();
let handle = tokio::spawn(async move {
// Start the initial operation and pin the future to the stack.
// Pinning to the stack is required to resume the operation
// across multiple calls to `select!`
let op = my_async_operation();
tokio::pin!(op);
// Get the initial config value
let mut conf = rx.borrow().clone();
let mut op_start = Instant::now();
let sleep = time::sleep_until(op_start + conf.timeout);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => {
// The operation elapsed. Restart it
op.set(my_async_operation());
// Track the new start time
op_start = Instant::now();
// Restart the timeout
sleep.set(time::sleep_until(op_start + conf.timeout));
}
_ = rx.changed() => {
conf = rx.borrow().clone();
// The configuration has been updated. Update the
// `sleep` using the new `timeout` value.
sleep.as_mut().reset(op_start + conf.timeout);
}
_ = &mut op => {
// The operation completed!
return
}
}
}
});
handles.push(handle);
}
for handle in handles.drain(..) {
handle.await.unwrap();
}
}
State synchronization
The remaining synchronization primitives focus on synchronizing state.
These are asynchronous equivalents to versions provided by std
. They
operate in a similar way as their std
counterparts but will wait
asynchronously instead of blocking the thread.
-
Barrier
Ensures multiple tasks will wait for each other to reach a point in the program, before continuing execution all together. -
Mutex
Mutual Exclusion mechanism, which ensures that at most one thread at a time is able to access some data. -
Notify
Basic task notification.Notify
supports notifying a receiving task without sending data. In this case, the task wakes up and resumes processing. -
RwLock
Provides a mutual exclusion mechanism which allows multiple readers at the same time, while allowing only one writer at a time. In some cases, this can be more efficient than a mutex. -
Semaphore
Limits the amount of concurrency. A semaphore holds a number of permits, which tasks may request in order to enter a critical section. Semaphores are useful for implementing limiting or bounding of any kind.
Modules
Structs
Semaphore::acquire
function.BarrierWaitResult
is returned by wait
when all tasks in the Barrier
have rendezvoused.Mutex
that has had a function applied to it via MutexGuard::map
.Mutex
-like type.Mutex
that has had a function applied to it via
OwnedMutexGuard::map
.Mutex
.Enums
OnceCell::set
.Semaphore::try_acquire
function.