Вызовы задачи при помощи Waker

Обычно футуры не могут завершиться сразу же, как их опросили (вызвали метод poll). Когда это случается, футура должна быть уверена, что, когда она будет готова прогрессировать,
она будет снова опрошена. Это решается при помощи типа Waker.

Каждый раз, когда футура опрашивается, она бывает частью "задачи". Задачи - это высокоуровневые футуры, с которыми работает исполнитель.

Waker обеспечивает метод wake(), который может быть использован, чтобы сказать исполнителю, что соответствующая задача должна быть пробуждена. Когда вызывается wake(), исполнитель знает, что задача, связанная с Waker, готова к выполнению, и в будущем должна быть опрошена снова.

Waker так же реализует clone(). Так вы можете его копировать где необходимо и хранить.

Давайте попробуем реализовать простой таймер с использованием Waker.

Применение: Создание таймера

В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем сообщаем о том когда время таймера истечёт.

Вот список зависимостей, которые нам понадобятся для начала:


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

Давайте определим тип нашей future. Нашей future необходим канал связи, чтобы сообщить о том что время таймера истекло и future должна завершиться. В качестве канала связи между таймером и future мы будем использовать разделяемое значение Arc<Mutex<..>>.


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
#}

Теперь давайте напишем реализацию Future!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

Просто, не так ли? Если поток установит shared_state.completed = true, мы закончили! В противном случае мы клонируем Waker для текущей задачи и сохраняем его в shared_state.waker. Так поток может разбудить задачу позже.

Важно отметить, что мы должны обновлять Waker каждый раз, когда future опрашивается, потому что future может быть перемещена в другую задачу с другим Waker. Это может произойти когда футуры передаются между задачами после опроса.

Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

Это всё, что нам нужно для того, чтобы построить простой таймер на future. Теперь нам нужен исполнитель, чтобы запустить future на исполнение.