1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
use super::{BlockingError, BlockingImpl};
use futures::Poll;
use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use tokio_executor::Enter;
thread_local! {
static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
}
/// Ensures that the executor is removed from the thread-local context
/// when leaving the scope. This handles cases that involve panicking.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub struct DefaultGuard<'a> {
prior: BlockingImpl,
_lifetime: PhantomData<&'a ()>,
}
/// Set the default blocking implementation, returning a guard that resets the
/// blocking implementation when dropped.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
CURRENT.with(|cell| {
let prior = cell.replace(blocking);
DefaultGuard {
prior,
_lifetime: PhantomData,
}
})
}
/// Set the default blocking implementation for the duration of the closure.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
where
F: FnOnce(&mut Enter) -> R,
{
let _guard = set_default(blocking);
f(enter)
}
/// Enter a blocking section of code.
///
/// The `blocking` function annotates a section of code that performs a blocking
/// operation, either by issuing a blocking syscall or by performing a long
/// running CPU-bound computation.
///
/// When the `blocking` function enters, it hands off the responsibility of
/// processing the current work queue to another thread. Then, it calls the
/// supplied closure. The closure is permitted to block indefinitely.
///
/// If the maximum number of concurrent `blocking` calls has been reached, then
/// `NotReady` is returned and the task is notified once existing `blocking`
/// calls complete. The maximum value is specified when creating a thread pool
/// using [`Builder::max_blocking`][build]
///
/// NB: The entire task that called `blocking` is blocked whenever the supplied
/// closure blocks, even if you have used future combinators such as `select` -
/// the other futures in this task will not make progress until the closure
/// returns.
/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
/// using `futures::sync::oneshot::spawn`).
///
/// [build]: struct.Builder.html#method.max_blocking
///
/// # Return
///
/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
/// `T` is the closure's return value.
///
/// If the thread pool has shutdown, `Err` is returned.
///
/// If the number of concurrent `blocking` calls has reached the maximum,
/// `Ok(NotReady)` is returned and the current task is notified when a call to
/// `blocking` will succeed.
///
/// If `blocking` is called from outside the context of a Tokio thread pool,
/// `Err` is returned.
///
/// # Background
///
/// By default, the Tokio thread pool expects that tasks will only run for short
/// periods at a time before yielding back to the thread pool. This is the basic
/// premise of cooperative multitasking.
///
/// However, it is common to want to perform a blocking operation while
/// processing an asynchronous computation. Examples of blocking operation
/// include:
///
/// * Performing synchronous file operations (reading and writing).
/// * Blocking on acquiring a mutex.
/// * Performing a CPU bound computation, like cryptographic encryption or
/// decryption.
///
/// One option for dealing with blocking operations in an asynchronous context
/// is to use a thread pool dedicated to performing these operations. This not
/// ideal as it requires bidirectional message passing as well as a channel to
/// communicate which adds a level of buffering.
///
/// Instead, `blocking` hands off the responsibility of processing the work queue
/// to another thread. This hand off is light compared to a channel and does not
/// require buffering.
///
/// # Examples
///
/// Block on receiving a message from a `std` channel. This example is a little
/// silly as using the non-blocking channel from the `futures` crate would make
/// more sense. The blocking receive can be replaced with any blocking operation
/// that needs to be performed.
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_threadpool;
///
/// use tokio_threadpool::{ThreadPool, blocking};
///
/// use futures::Future;
/// use futures::future::{lazy, poll_fn};
///
/// use std::sync::mpsc;
/// use std::thread;
/// use std::time::Duration;
///
/// pub fn main() {
/// // This is a *blocking* channel
/// let (tx, rx) = mpsc::channel();
///
/// // Spawn a thread to send a message
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// tx.send("hello").unwrap();
/// });
///
/// let pool = ThreadPool::new();
///
/// pool.spawn(lazy(move || {
/// // Because `blocking` returns `Poll`, it is intended to be used
/// // from the context of a `Future` implementation. Since we don't
/// // have a complicated requirement, we can use `poll_fn` in this
/// // case.
/// poll_fn(move || {
/// blocking(|| {
/// let msg = rx.recv().unwrap();
/// println!("message = {}", msg);
/// }).map_err(|_| panic!("the threadpool shut down"))
/// })
/// }));
///
/// // Wait for the task we just spawned to complete.
/// pool.shutdown_on_idle().wait().unwrap();
/// }
/// ```
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where
F: FnOnce() -> T,
{
CURRENT.with(|cell| {
let blocking = cell.get();
// Object-safety workaround: the `Blocking` trait must be object-safe,
// since we use a trait object in the thread-local. However, a blocking
// _operation_ will be generic over the return type of the blocking
// function. Therefore, rather than passing a function with a return
// type to `Blocking::run_blocking`, we pass a _new_ closure which
// doesn't have a return value. That closure invokes the blocking
// function and assigns its value to `ret`, which we then unpack when
// the blocking call finishes.
let mut f = Some(f);
let mut ret = None;
{
let ret2 = &mut ret;
let mut run = move || {
let f = f
.take()
.expect("blocking closure invoked twice; this is a bug!");
*ret2 = Some((f)());
};
try_ready!((blocking)(&mut run));
}
// Return the result
let ret =
ret.expect("blocking function finished, but return value was unset; this is a bug!");
Ok(ret.into())
})
}
// === impl DefaultGuard ===
impl<'a> fmt::Debug for DefaultGuard<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("DefaultGuard { .. }")
}
}
impl<'a> Drop for DefaultGuard<'a> {
fn drop(&mut self) {
// if the TLS value has already been torn down, there's nothing else we
// can do. we're almost certainly panicking anyway.
let _ = CURRENT.try_with(|cell| {
cell.set(self.prior);
});
}
}