Исполнители и системный ввод/вывод
В главе "Типаж Future
", мы обсуждали такой
пример future
, которая выполняет асинхронное чтение сокета:
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // The socket has data-- read it into a buffer and return it. Poll::Ready(self.socket.read_buf()) } else { // The socket does not yet have data. // // Arrange for `wake` to be called once data is available. // When data becomes available, `wake` will be called, and the // user of this `Future` will know to call `poll` again and // receive data. self.socket.set_readable_callback(wake); Poll::Pending } } } #}
Эта future
читает доступные данные из сокета и если доступных
данных нет, то она будет передана исполнителю с запросом
активирования задачи, если сокет снова станет читаемым. Однако,
из текущего примера не ясна реализация типа
Socket
и, в частности, не совсем очевидно как
работает функция set_readable_callback
. Как мы
можем сделать так, чтобы lw.wake()
был вызван,
когда сокет станет читаемым? Один из вариантов - иметь поток,
который постоянно проверяет стал ли socket
читаемым, вызывая при необходимости метод
wake()
. Тем не менее, такой подход будет весьма не
эффективным, так как он требует отдельного потока для каждой
блокирующей IO future
. Это значительно снизит эффективность
нашего асинхронного кода.
На практике эта проблема решается при помощи интеграции с
IO - зависимыми системными блокирующими примитивами такими,
как epoll
в Linux, kqueue
во FreeBSD и
Mac OS, IOCP
в Windows и port
в Fuchsia (все они
предоставляются при помощи кроссплатформенного Rust-пакета
mio
). Все эти примитивы позволяют потоку
заблокироваться с несколькими асинхронными IO-событиями,
возвращая одно из завершённых событий. На практике эти API
выглядят примерно так:
# #![allow(unused_variables)] #fn main() { struct IoBlocker { ... } struct Event { // ID уникально идентифицирующее событие, которое уже произошло и на которое мы подписались. id: usize, // Набор сигналов для ожидания или которые произошли. signals: Signals, } impl IoBlocker { /// Создаём новую коллекцию асинхронных IO-событий для блокировки. fn new() -> Self { ... } /// Проявим интерес к определённому IO-событию. fn add_io_event_interest( &self, /// Объект, на котором происходит событие io_object: &IoObject, /// Набор сигналов, которые могут применяться к `io_object`, /// для которых должно быть инициировано событие, в паре с /// ID, которые передадутся событиям, получившимся в результате нашего интереса. event: Event, ) { ... } /// Заблокируется до появления одного из событий. fn block(&self) -> Event { ... } } let mut io_blocker = IoBlocker::new(); io_blocker.add_io_event_interest( &socket_1, Event { id: 1, signals: READABLE }, ); io_blocker.add_io_event_interest( &socket_2, Event { id: 2, signals: READABLE | WRITABLE }, ); let event = io_blocker.block(); // выведет что-то похожее на "Socket 1 is now READABLE", если сокет станет доступным для чтения. println!("Socket {:?} is now {:?}", event.id, event.signals); #}
Исполнители Future
могут использовать эти примитивы для предоставления асинхронных объектов ввода-вывода,
таких как сокеты, которые могут настроить обратные вызовы для запуска при определённом событии ввода-вывода. В случае нашего примера SocketRead
выше,
Socket::set_readable_callback
функция может выглядеть следующим псевдокодом:
# #![allow(unused_variables)] #fn main() { impl Socket { fn set_readable_callback(&self, waker: Waker) { // `local_executor` это ссылка на локальный исполнитель. // это может быть предусмотрено при создании сокета, // большинство реализаций исполнителей делают это через локальный поток, так удобнее. let local_executor = self.local_executor; // Уникальный ID для объекта ввода вывода. let id = self.id; // Сохраним `waker` в данных исполнителя, // чтобы его можно было вызвать после того как будет получено событие. local_executor.event_map.insert(id, waker); local_executor.add_io_event_interest( &self.socket_file_descriptor, Event { id, signals: READABLE }, ); } } #}
Теперь у нас может быть только один поток исполнителя, который может принимать и отправлять любые
события ввода-вывода в соответствующий Waker
, который разбудит соответствующую
задачу, позволяющая исполнителю довести больше задач до завершения перед возвратом,
чтобы проверить больше событий ввода-вывода (и цикл продолжается...).