Как реализовать асин c функцию с Windows IOCP в Rust? - PullRequest
2 голосов
/ 05 апреля 2020

В C# мы можем построить ожидаемый Объект с реализацией интерфейса INotifyCompletion.

public class MyAwaiter<T> : INotifyCompletion
{
    public bool IsCompleted { get; private set; }
    public T GetResult()
    {
        throw new NotImplementedException();
    }
    public void OnCompleted(Action continuation)
    {
        throw new NotImplementedException();
    }
}

Но в Rust я не знаю, как построить асинхронную c функцию для операции поддержки, которые в настоящее время не поддерживаются в существующих асинхронных библиотеках, такие как связь с низкоуровневыми устройствами.

Не могли бы вы привести пример самореализующейся асинхронной c функции в rust?

1 Ответ

1 голос
/ 05 апреля 2020

Вам потребуется реализовать черту Future в структуре, поэтому давайте рассмотрим определение Future в std, в частности, это .poll метод:

Когда будущее еще не готово, опрос возвращает Poll::Pending и сохраняет клон Waker, скопированный с текущего Context. Это Waker затем просыпается, как только будущее может прогрессировать Например, будущее, ожидающее, когда сокет станет читабельным, вызовет .clone() на Waker и сохранит его.

Один из способов использовать это с некоторым асинхронным механизмом, заданным ОС, будет отправить клонированный Waker недавно созданному потоку (или, в идеале, пулу потоков, в котором вы можете ставить в очередь события для пробуждения), который блокирует настроенное вами событие и вызывает wake(), когда это будет сделано .

В этом примере я использовал режим сна в потоке, но, используя Mio, как рекомендует комментатор, или напрямую IOCP, вы можете получить довольно похожий код, важным аспектом которого является просто пробуждение Waker и уведомление Future, что это произошло.

struct MyEvent {
    is_ready: Arc<AtomicBool>, // Could use a channel to transfer when the task is ready instead
    is_polled: bool,           // Prevents multiple events to get enqueued on the same future
}

impl MyEvent {
    fn new() -> Self {
        MyEvent {
            is_ready: Arc::new(AtomicBool::new(false)),
            is_polled: false,
        }
    }
}

impl Future for MyEvent {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self.is_ready.load(atomic::Ordering::SeqCst) {
            true => Poll::Ready(()),
            false => {
                if self.is_polled {
                    Poll::Pending
                } else {
                    let waker = cx.waker().clone();
                    let channel = Arc::clone(&self.is_ready);
                    self.get_mut().is_polled = true;
                    thread::spawn(move || {
                        // Here you block based on whatever event
                        thread::sleep(Duration::from_secs(5));
                        channel.store(true, atomic::Ordering::SeqCst);
                        waker.wake();
                    });
                    Poll::Pending
                }
            }
        }
    }
}

РЕДАКТИРОВАТЬ: Я только что заметил, что вам нужно обновлять Вакер всякий раз, когда делается новый опрос (хотя это не должно происходить с большинством исполнителей, так как они должны повторять только когда Waker проснется). Решение не тривиально, и я бы предложил читателю проверить ящик Futures как в его исходном коде, так и в предоставленных каналах (oneshot) и AtomicWaker, что должно сделать это намного проще. Если будет запрошена реальная реализация, решающая эту проблему, я попытаюсь работать над простым ПО C.

...