Проблема с ранней отправкой сообщений в NServiceBus 6. Пакетная отправка сообщений не работает? - PullRequest
0 голосов
/ 22 марта 2019

Мы используем NServiceBus 6 с транспортами MSMQ и RabbitMq.Когда у нас был MSMQ с распределенными транзакциями, мы никогда не отправляли сообщения раньше, чем была зафиксирована распределенная транзакция.Однако, когда мы отключили его и вручную завернули в область транзакции, мы видим, что Nsb начал отправлять сообщения до того, как закончится выполнение обработчика.Наш код выглядит так:

public Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       HandleMessage1(message1);

       _session.Flush();
       tx.Commit();
    }   
}

private void HandleMessage1(ICommand1 message1)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    bus.Send<ICommand2>(x =>
    {
       ...
    });
}

Из журналов видно, что ICommand2 начинает обрабатывать раньше, чем обработчик ICommand1 смог зафиксировать изменения данных в базе данных, получив «старые» данные, прежде чем обработчик Command1 обновит их.

У меня сложилось впечатление, что у нас не будет такой проблемы, поскольку Nsb6 предоставляет нам Пакетная отправка сообщений .Похоже, что это не так для нас, мне интересно, почему и как мы можем это исправить.Я попытался запустить его под MSMQ (без дистрибутивной транзакции) и RabbitMq, результат тот же.

Обработчик Command2 работает с изменениями, внесенными обработчиком Command1, так как мы можем заставить их работать последовательно?

1 Ответ

2 голосов
/ 22 марта 2019

Отправка сообщений из обработчика должна осуществляться с использованием context, переданного в обработчик. Когда вы используете bus.Send(), скорее всего, вы используете IMessageSession вместо IMessageHandlerContext, что эквивалентно немедленной отправке.

Изменение вашего кода на следующее должно разрешить это:

public async Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       await HandleMessage1(message1, context);

       _session.Flush();
       tx.Commit();
    }   
}

private async Task  HandleMessage1(ICommand1 message1, IMessageHandlerContext context)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    await context.Send<ICommand2>(x =>
    {
       ...
    });
}

...