Как реализовать атомарный переход с одного IObserver на другой? - PullRequest
3 голосов
/ 14 июня 2011

У меня есть IObservable<byte[]>, который я преобразую в IObservable<XDocument>, используя несколько промежуточных шагов:

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

В какой-то момент времени меня интересуют наблюдаемые XDocument с, поэтому я подписываюсь на IObserver<XDocument>. Позже я хотел бы подписаться на другой IObserver<XDocument> и избавиться от старого.

Как я могу сделать это за одну атомарную операцию, не теряя ни одного наблюдаемого XDocument? Я мог бы сделать что-то вроде:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

Я обеспокоен тем, что между этими двумя вызовами я могу потерять XDocument. Если я переключу два вызова, может случиться так, что я получу один и тот же XDocument дважды.

Ответы [ 2 ]

6 голосов
/ 14 июня 2011

Я бы, наверное, добавил слой косвенности.Напишите класс с именем ExchangeableObserver, подпишите его на свой наблюдаемый объект и оставьте его на постоянной основе.Работа ExchangeableObserver состоит в том, чтобы делегировать все данному суб-наблюдателю.Но программисту разрешено в любое время сменить делегируемого суб-наблюдателя.В моем примере у меня есть метод Exchange ().Что-то вроде:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}
1 голос
/ 14 июня 2011

вы можете использовать семафор, который гарантирует, что, пока IObservable<byte[]> готовится к IObservable<XDocument>, смена наблюдателя не происходит.

псевдокод, как это можно сделать (не тестирование)

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  


  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();

  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

Редактировать: с помощью Call IObservable<XDocument>

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

Я интерпретирую ваше предложение

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

, что вы зарегистрировали обработчик событий для IObservable<byte[]>, который создает XDocument из byte[] и затем вызывает что-то, что вызывает событие для IObservable<XDocument>.

Call IObservable<XDocument> означает код, который запускает последующее событие

...