1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
//! An asynchronous `Mutex`-like type.
//!
//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
//! release it at some later point in time.
//!
//! This allows you to do something along the lines of:
//!
//! ```rust,no_run
//! # #[macro_use]
//! # extern crate futures;
//! # extern crate tokio;
//! # use futures::{future, Poll, Async, Future, Stream};
//! use tokio::sync::lock::{Lock, LockGuard};
//! struct MyType<S> {
//! lock: Lock<S>,
//! }
//!
//! impl<S> Future for MyType<S>
//! where S: Stream<Item = u32> + Send + 'static
//! {
//! type Item = ();
//! type Error = ();
//!
//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
//! match self.lock.poll_lock() {
//! Async::Ready(mut guard) => {
//! tokio::spawn(future::poll_fn(move || {
//! let item = try_ready!(guard.poll().map_err(|_| ()));
//! println!("item = {:?}", item);
//! Ok(().into())
//! }));
//! Ok(().into())
//! },
//! Async::NotReady => Ok(Async::NotReady)
//! }
//! }
//! }
//! # fn main() {}
//! ```
//!
//! [`Lock`]: struct.Lock.html
//! [`LockGuard`]: struct.LockGuard.html
use futures::Async;
use semaphore;
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
/// the data is only ever accessed when the mutex is locked.
#[derive(Debug)]
pub struct Lock<T> {
inner: Arc<State<T>>,
permit: semaphore::Permit,
}
/// A handle to a held `Lock`.
///
/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
/// away, the guard remains valid.
///
/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
/// will succeed yet again.
#[derive(Debug)]
pub struct LockGuard<T>(Lock<T>);
// As long as T: Send, it's fine to send and share Lock<T> between threads.
// If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through
// Lock<T>.
unsafe impl<T> Send for Lock<T> where T: Send {}
unsafe impl<T> Sync for Lock<T> where T: Send {}
unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {}
#[derive(Debug)]
struct State<T> {
c: UnsafeCell<T>,
s: semaphore::Semaphore,
}
#[test]
fn bounds() {
fn check<T: Send>() {}
check::<LockGuard<u32>>();
}
impl<T> Lock<T> {
/// Creates a new lock in an unlocked state ready for use.
pub fn new(t: T) -> Self {
Self {
inner: Arc::new(State {
c: UnsafeCell::new(t),
s: semaphore::Semaphore::new(1),
}),
permit: semaphore::Permit::new(),
}
}
/// Try to acquire the lock.
///
/// If the lock is already held, the current task is notified when it is released.
pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}) {
return Async::NotReady;
}
// We want to move the acquired permit into the guard,
// and leave an unacquired one in self.
let acquired = Self {
inner: self.inner.clone(),
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
};
Async::Ready(LockGuard(acquired))
}
}
impl<T> Drop for LockGuard<T> {
fn drop(&mut self) {
if self.0.permit.is_acquired() {
self.0.permit.release(&self.0.inner.s);
} else if ::std::thread::panicking() {
// A guard _should_ always hold its permit, but if the thread is already panicking,
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
} else {
unreachable!("Permit not held when LockGuard was dropped")
}
}
}
impl<T> From<T> for Lock<T> {
fn from(s: T) -> Self {
Self::new(s)
}
}
impl<T> Clone for Lock<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
permit: semaphore::Permit::new(),
}
}
}
impl<T> Default for Lock<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> Deref for LockGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
assert!(self.0.permit.is_acquired());
unsafe { &*self.0.inner.c.get() }
}
}
impl<T> DerefMut for LockGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
assert!(self.0.permit.is_acquired());
unsafe { &mut *self.0.inner.c.get() }
}
}
impl<T: fmt::Display> fmt::Display for LockGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}