Я пытаюсь переписать некоторый код, используя Reactive Extensions для .NET, но мне нужно несколько советов о том, как достичь своей цели.
У меня есть класс, который инкапсулирует некоторое асинхронное поведение в низкоуровневой библиотеке.Подумайте, что либо читает, либо пишет в сети.Когда класс запущен, он попытается подключиться к среде, и в случае успеха он сообщит об этом, вызвав рабочий поток.
Я хочу превратить это асинхронное поведение в синхронный вызов, и я создалНиже приведен очень упрощенный пример того, как этого можно достичь:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
Выполнение AsyncStart
в рабочем потоке - это просто способ симулировать асинхронное поведение библиотеки и не является частью моего реального кода, гдебиблиотека низкого уровня предоставляет поток и вызывает мой код при обратном вызове.
Обратите внимание, что метод Start
выдает TimeoutException
, если запуск не завершен в течение интервала времени ожидания.
Я хочу переписать этот код для использования Rx.Вот моя первая попытка:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Это достойная попытка, но, к сожалению, она содержит условие гонки.Если запуск завершается fast (например, если delay
равен 0) и если в точке A имеется дополнительная задержка, то OnNext
будет вызываться на readySubject
до выполнения First
.По сути, IObservable
, который я применяю Timeout
, и First
никогда не видит, что запуск завершен, и вместо этого будет выброшен TimeoutException
.
Кажется, что Observable.Defer
был создан для обработкитакие проблемыВот несколько более сложная попытка использовать Rx:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Теперь асинхронная операция запускается не сразу, а только при использовании IObservable
.К сожалению, все еще есть состояние гонки, но на этот раз в точке B. Если асинхронная операция начала вызовы OnNext
до того, как лямбда-сигнал Defer
вернется, она все еще потеряна, и TimeoutException
будет выброшено Timeout
.
Я знаю, что могу использовать операторы типа Replay
для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации.Есть ли способ использовать Rx для решения моей проблемы без условий гонки?По сути, запуск асинхронной операции только после того, как IObservable
был подключен, в этом случае Timeout
и First
?
Основываясь на ответе Пола Беттса, вот рабочее решение:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
Интересная часть - это когда в точке C задержка больше, чем время, необходимое для завершения AsyncStart
.AsyncSubject
сохраняет последнее отправленное уведомление, а Timeout
и First
все равно будут работать как положено.