Использование Rx для блокировки (и, возможно, тайм-аут) в асинхронной операции - PullRequest
9 голосов
/ 17 января 2011

Я пытаюсь переписать некоторый код, используя Reactive Extensions для .NET, но мне нужно несколько советов о том, как достичь своей цели.

У меня есть класс, который инкапсулирует некоторое асинхронное поведение в низкоуровневой библиотеке.Подумайте, что либо читает, либо пишет в сети.Когда класс запущен, он попытается подключиться к среде, и в случае успеха он сообщит об этом, вызвав рабочий поток.

Я хочу превратить это асинхронное поведение в синхронный вызов, и я создалНиже приведен очень упрощенный пример того, как этого можно достичь:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

Выполнение AsyncStart в рабочем потоке - это просто способ симулировать асинхронное поведение библиотеки и не является частью моего реального кода, гдебиблиотека низкого уровня предоставляет поток и вызывает мой код при обратном вызове.

Обратите внимание, что метод Start выдает TimeoutException, если запуск не завершен в течение интервала времени ожидания.

Я хочу переписать этот код для использования Rx.Вот моя первая попытка:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Это достойная попытка, но, к сожалению, она содержит условие гонки.Если запуск завершается fast (например, если delay равен 0) и если в точке A имеется дополнительная задержка, то OnNext будет вызываться на readySubject до выполнения First.По сути, IObservable, который я применяю Timeout, и First никогда не видит, что запуск завершен, и вместо этого будет выброшен TimeoutException.

Кажется, что Observable.Defer был создан для обработкитакие проблемыВот несколько более сложная попытка использовать Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Теперь асинхронная операция запускается не сразу, а только при использовании IObservable.К сожалению, все еще есть состояние гонки, но на этот раз в точке B. Если асинхронная операция начала вызовы OnNext до того, как лямбда-сигнал Defer вернется, она все еще потеряна, и TimeoutException будет выброшено Timeout.

Я знаю, что могу использовать операторы типа Replay для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации.Есть ли способ использовать Rx для решения моей проблемы без условий гонки?По сути, запуск асинхронной операции только после того, как IObservable был подключен, в этом случае Timeout и First?


Основываясь на ответе Пола Беттса, вот рабочее решение:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

Интересная часть - это когда в точке C задержка больше, чем время, необходимое для завершения AsyncStart.AsyncSubject сохраняет последнее отправленное уведомление, а Timeout и First все равно будут работать как положено.

Ответы [ 2 ]

12 голосов
/ 18 января 2011

Итак, одна вещь, которую нужно знать о Rx, я думаю, что многие люди сначала делают (включая меня!): Если вы используете какую-либо традиционную функцию потоков, такую ​​как ResetEvents, Thread.Sleeps или что-то еще, вы делаете это Неверно (tm) - это все равно что приводить вещи к массивам в LINQ, потому что вы знаете, что базовый тип - это массив.

Главное, что нужно знать, это то, что асинхронная функция представлена ​​функцией, которая возвращает IObservable<TResult> - это волшебный соус, который позволяет вам сигнализировать, когда что-то завершено. Итак, вот как вы могли бы «Rx-ify» использовать более традиционную асинхронную функцию, как в веб-сервисе Silverlight:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

Это достойная попытка, но, к сожалению, она содержит условие гонки.

Это то место, куда приходит AsyncSubject - это гарантирует, что даже если asyncReaderFunc превзойдет «Подписаться на удар», AsyncSubject все равно «воспроизведет» то, что произошло.

Итак, теперь, когда у нас есть наша функция, мы можем сделать с ней много интересного:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();
4 голосов
/ 03 октября 2012

Я также добавил бы к комментарию Пола о том, что WaitHandles означает, что вы делаете это неправильно, а прямое использование Subjects обычно означает, что вы тоже делаете это неправильно.; -)

Попробуйте рассмотреть ваш код Rx, работающий с последовательностями или конвейерами.Субъекты предлагают возможности чтения и записи, что означает, что вы больше не работаете с конвейером или последовательностью (если у вас нет пиплиний, которые идут обоими способами или последовательностями, которые могут повернуть вспять?!?)

Итак, сначала код Пола довольно хорошкруто, но давайте "Rx к черту из этого".

1-й Метод AsyncStart изменить его на

IObservable<Unit> AsyncStart(TimeSpan delay) 
{
  Observable.Timer(delay).Select(_=>Unit.Default);
}

Так просто!Смотри без предметов и данные только текут в одну сторону.Здесь важно поменять подпись.Это подтолкнет нас к чему-то.Это сейчас очень явно.Переход в Субъект для меня очень неоднозначен.

2nd .Теперь нам не нужен субъект, определенный в методе start.Мы также можем использовать функции планировщика вместо старого skool ThreadPool.QueueUserWorkItem.

void Start(TimeSpan timeout) 
{
    var isReady = AsyncStart(TimeSpan.FromSeconds(1))
                    .SubscribeOn(Scheduler.ThreadPool)
                    .PublishLast();
    isReady.Connect();
    isReady.Timeout(timeout).First();
}

Теперь у нас есть понятный конвейер или последовательность событий

AsyncStart -> isReady ->Start

Вместо Start -> AsyncStart -> Start

Если бы я знал больше о вашей проблемной области, я уверен, что мы могли бы придумать еще лучший способ сделать это, который сделалне требует блокирующего характера метода запуска.Чем больше вы используете Rx, тем больше вы обнаружите, что ваши старые предположения о том, когда вам нужно заблокировать, использовать ручки ожидания и т. Д., Могут быть выброшены из окна.

...