Синхронизация RX .NET Подписаться по CorrelationId - PullRequest
0 голосов
/ 19 сентября 2018

Я пытаюсь написать что-то похожее на Sagas, используя RX.NET.Я столкнулся с простой проблемой и не знаю, как лучше синхронизировать состояния по идентификатору корреляции.У меня есть простой EventAggregator, который прослушивает некоторые события и может получать уведомления от нескольких потоков, поэтому я не могу предположить, что подписка всегда вызывается в одном и том же потоке, поэтому существует вероятность того, что первое уведомление обновит состояние, а тем временем другое уведомление будетполучить старое состояние и работать с ним, поэтому я должен синхронизировать его.Я упростил свой сценарий до чего-то вроде ниже.Я мог бы заблокировать всю подписку с помощью некоторого зависимого идентификатора объекта, но это кажется неправильным.Что такое RX способ сделать это?

 public interface IEventAggregator
    {
        IObservable<T> GetEvent<T>() where T : Event;
    }

    public class EventAggregator : IEventAggregator
    {
        Subject<Event> _sub = new Subject<Event>();

        public IObservable<T> GetEvent<T>() where T : Event
        {
            return _sub.OfType<T>();
        }

        public void Notify<T>(T ev) where T : Event
        {
            _sub.OnNext(ev);
        }
    }

    public class Event
    {
        public string CorrelationId { get; set; }
    }

    public class State
    {
        public int SomeValue { get; set; }
    }

    public interface IStateRepository
    {
        State Get(string id);
    }

    public class ProcessManager
    {
        private readonly IStateRepository _stateRepository;
        private readonly IEventAggregator _eventAggregator;

        public ProcessManager(IStateRepository stateRepository, IEventAggregator eventAggregator)
        {
            _stateRepository = stateRepository;
            _eventAggregator = eventAggregator;

            eventAggregator.GetEvent<Event>()
             .Select(x => _stateRepository.Get(x.CorrelationId))
             .Subscribe(state =>
             {
                 // -do something with state like write or read;
                 // - state should be unique per correlation id.
                 // - this block should be synchronized.
             });
        }
    }
...