У меня есть серия модулей, использующих модель публикации / подписки RX.
Вот код регистрации события (повторяется для модуля подписки):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
Издатель прост, благодаря коду Хосе Романиелло :
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
Теперь моя проблема: как вы можете видеть выше, я использовал метод .ObserveOn (Scheduler.TaskPool) для выделения нового потока из пула для каждого модуля, для каждого события. Это потому что у меня много событий и модулей. Проблема, конечно, заключается в том, что события перемешиваются во временном порядке, поскольку некоторые события запускаются близко друг к другу, а затем в конечном итоге вызывают обратный вызов OnDataEvent в неправильном порядке (каждый OnDataEvent содержит метку времени).
Существует ли простой способ использования RX для обеспечения правильного порядка событий? Или я мог бы написать свой собственный планировщик, чтобы каждый модуль получал события в последовательности?
События публикуются, конечно, в правильной последовательности.
Заранее спасибо.