Я не уверен, что правильно понимаю симптомы, которые вы испытываете.
Если я отправляю два сообщения «одновременно» первому обработчику, каждое приходит и даже сВ свойствах, которые коррелируют с этими сообщениями, свойство IsNew не изменяется после первого обработанного сообщения
Если вызывается EnforceExclusiveAccess
, я ожидаю, что сообщения будут обрабатываться последовательно, первое с IsNew == true
и второе с IsNew == false
.
Если нет, то я ожидаю, что оба сообщения будут обрабатываться параллельно с IsNew == true
, но затем - когда вставляются данные мудреца - я ожидаю одно изчтобы они преуспели, а другой потерпел неудачу с ConcurrencyException
.
После ConcurrencyException
сообщение было бы обработано снова, на этот раз с IsNew == false
.
Разве это нечто вы испытываете?
Во втором обработчике я хочу получить доступ ко всем данным, относящимся к этим сагам, но я не могу, потому что эти данные кажутся такими, какими они были в ревизиииз этих сообщенийas deferred.
Вы говорите, что данные в саге, кажется, находятся в состоянии, в котором они находились, когда сообщение VerifyRouteListIsComplete
было отложено?
Звучит действительностранно, а также довольно маловероятно - не могли бы вы попробовать еще раз и посмотреть, действительно ли это так?
ОБНОВЛЕНИЕ: Я выяснил, почему вы испытываете это странное поведение: вы случайно настроили свою сагуЭкземпляр обработчика для повторного использования в сообщениях.
Вы сделали это, зарегистрировав его следующим образом (ВНИМАНИЕ: Не делайте этого!):
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);
_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);
где метод Subscribe
затем делает этот вызов на BuiltinHandlerActivator
(ВНИМАНИЕ: не делайте этого!):
activator.Register(() => handlerInstance);
Эта причина, почему это плохо (особенно для обработчика саги), заключается в том, что сам экземпляр обработчика является состоящим- у него есть свойство Data
, содержащее текущее состояние процесса, а также свойство IsNew
.
То, что вы должны ВСЕГДА делать, - это гарантировать, что каждый раз создается новый экземпляр обработчика.приходит сообщениев - ваш код должен быть изменен на что-то вроде этого:
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
, что может быть сделано, если реализация Subscribe
изменится на это:
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
_activator.Register((bus, context) => getHandler());
await _activator.Bus.Subscribe<T>();
}
Это решит вашу проблему.проблема монопольного доступа:)
Есть еще одна проблема с вашим кодом: у вас есть потенциальная возможность состязания между регистрацией вашего обработчика и запуском экземпляра абонентской шины, потому что в теории вы можете быть неудачливы и начать получать сообщения между шинойначинается, и вы регистрируете свой обработчик.
Вы должны изменить свой код, чтобы убедиться, что все обработчики зарегистрированы до того, как вы запустите шину (и, следовательно, начнете получать сообщения).