Reactive Extensions - создание асинхронных событий и подписка на определенные потоки - PullRequest
2 голосов
/ 08 сентября 2011

У меня есть серия модулей, использующих модель публикации / подписки RX.

Вот код регистрации события (повторяется для модуля подписки):

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(module.OnDataEvent);  

Издатель прост, благодаря коду Хосе Романиелло :

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> _subjects = 
        new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
        return subject.AsObservable(); 
    }
    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject; 
        if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
        { 
            ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
        }
    }
}

Теперь моя проблема: как вы можете видеть выше, я использовал метод .ObserveOn (Scheduler.TaskPool) для выделения нового потока из пула для каждого модуля, для каждого события. Это потому что у меня много событий и модулей. Проблема, конечно, заключается в том, что события перемешиваются во временном порядке, поскольку некоторые события запускаются близко друг к другу, а затем в конечном итоге вызывают обратный вызов OnDataEvent в неправильном порядке (каждый OnDataEvent содержит метку времени).

Существует ли простой способ использования RX для обеспечения правильного порядка событий? Или я мог бы написать свой собственный планировщик, чтобы каждый модуль получал события в последовательности?

События публикуются, конечно, в правильной последовательности.

Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 08 сентября 2011

Попробуйте использовать эту реализацию EventPublisher:

public class EventPublisher : IEventPublisher
{
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>();

    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        return _subject
            .Where(o => o is TEvent)
            .Select(o => (TEvent)o)
            .ObserveOn(_scheduler);
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}

Он использует EventLoopScheduler, чтобы гарантировать, что все события происходят по порядку и в одном фоновом потоке.

Удалите ObserveOn из своей подписки, потому что, если вы наблюдаете в другом потоке, вы рискуете, что события произойдут в неправильном порядке.

Решает ли это вашу проблему?

0 голосов
/ 08 сентября 2011

Попробуйте метод синхронизации как:

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize()
    .Subscribe(module.OnDataEvent);

Хотя я попробовал ваш сценарий с тем же кодом и обнаружил, что данные поступают в последовательности и не перекрываются. Может быть, это что-то конкретное для вашего приложения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...