1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use crate::pin::Pin;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Release};
use crate::sys::futex::{futex_wait, futex_wake};
use crate::time::Duration;
const PARKED: u32 = u32::MAX;
const EMPTY: u32 = 0;
const NOTIFIED: u32 = 1;
pub struct Parker {
state: AtomicU32,
}
// Notes about memory ordering:
//
// Memory ordering is only relevant for the relative ordering of operations
// between different variables. Even Ordering::Relaxed guarantees a
// monotonic/consistent order when looking at just a single atomic variable.
//
// So, since this parker is just a single atomic variable, we only need to look
// at the ordering guarantees we need to provide to the 'outside world'.
//
// The only memory ordering guarantee that parking and unparking provide, is
// that things which happened before unpark() are visible on the thread
// returning from park() afterwards. Otherwise, it was effectively unparked
// before unpark() was called while still consuming the 'token'.
//
// In other words, unpark() needs to synchronize with the part of park() that
// consumes the token and returns.
//
// This is done with a release-acquire synchronization, by using
// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
// Ordering::Acquire when checking for this state in park().
impl Parker {
/// Construct the futex parker. The UNIX parker implementation
/// requires this to happen in-place.
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Self { state: AtomicU32::new(EMPTY) });
}
// Assumes this is only called by the thread that owns the Parker,
// which means that `self.state != PARKED`.
pub unsafe fn park(self: Pin<&Self>) {
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
// first case.
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
loop {
// Wait for something to happen, assuming it's still set to PARKED.
futex_wait(&self.state, PARKED, None);
// Change NOTIFIED=>EMPTY and return in that case.
if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
return;
} else {
// Spurious wake up. We loop to try again.
}
}
}
// Assumes this is only called by the thread that owns the Parker,
// which means that `self.state != PARKED`. This implementation doesn't
// require `Pin`, but other implementations do.
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
// first case.
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
// Wait for something to happen, assuming it's still set to PARKED.
futex_wait(&self.state, PARKED, Some(timeout));
// This is not just a store, because we need to establish a
// release-acquire ordering with unpark().
if self.state.swap(EMPTY, Acquire) == NOTIFIED {
// Woke up because of unpark().
} else {
// Timeout or spurious wake up.
// We return either way, because we can't easily tell if it was the
// timeout or not.
}
}
// This implementation doesn't require `Pin`, but other implementations do.
#[inline]
pub fn unpark(self: Pin<&Self>) {
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
// wake the thread in the first case.
//
// Note that even NOTIFIED=>NOTIFIED results in a write. This is on
// purpose, to make sure every unpark() has a release-acquire ordering
// with park().
if self.state.swap(NOTIFIED, Release) == PARKED {
futex_wake(&self.state);
}
}
}