Я пытаюсь написать что-то похожее на Sagas, используя RX.NET.Я столкнулся с простой проблемой и не знаю, как лучше синхронизировать состояния по идентификатору корреляции.У меня есть простой EventAggregator, который прослушивает некоторые события и может получать уведомления от нескольких потоков, поэтому я не могу предположить, что подписка всегда вызывается в одном и том же потоке, поэтому существует вероятность того, что первое уведомление обновит состояние, а тем временем другое уведомление будетполучить старое состояние и работать с ним, поэтому я должен синхронизировать его.Я упростил свой сценарий до чего-то вроде ниже.Я мог бы заблокировать всю подписку с помощью некоторого зависимого идентификатора объекта, но это кажется неправильным.Что такое RX способ сделать это?
public interface IEventAggregator
{
IObservable<T> GetEvent<T>() where T : Event;
}
public class EventAggregator : IEventAggregator
{
Subject<Event> _sub = new Subject<Event>();
public IObservable<T> GetEvent<T>() where T : Event
{
return _sub.OfType<T>();
}
public void Notify<T>(T ev) where T : Event
{
_sub.OnNext(ev);
}
}
public class Event
{
public string CorrelationId { get; set; }
}
public class State
{
public int SomeValue { get; set; }
}
public interface IStateRepository
{
State Get(string id);
}
public class ProcessManager
{
private readonly IStateRepository _stateRepository;
private readonly IEventAggregator _eventAggregator;
public ProcessManager(IStateRepository stateRepository, IEventAggregator eventAggregator)
{
_stateRepository = stateRepository;
_eventAggregator = eventAggregator;
eventAggregator.GetEvent<Event>()
.Select(x => _stateRepository.Get(x.CorrelationId))
.Subscribe(state =>
{
// -do something with state like write or read;
// - state should be unique per correlation id.
// - this block should be synchronized.
});
}
}