Rebus Sagas, редакции и отложенные сообщения - PullRequest
0 голосов
/ 21 мая 2018

Я пытаюсь настроить Saga, который работает следующим образом:

  1. Saga получает сообщение с заказом на доставку.Этот заказ на отправку имеет свойство RouteId, которое я могу использовать для сопоставления заказов на отправку для того же «грузовика»
  2. Эти заказы на отправку создаются другой системой, которая может использовать пакетный процесс для отправки этих заказов.Но эта система не может сгруппировать заказы на доставку по тому же адресу.
  3. Через некоторое количество секунд я отправил еще одно сообщение только с этим RouteId.Мне нужно захватить все заказы на доставку полученного RouteId, сгруппировать их по адресу, а затем перевести его в другой объект и отправить в другой веб-сервис.

Но я сталкиваюсь с двумя проблемами:

  1. Если я отправляю два сообщения «одновременно» первому обработчику, каждое приходит и даже со свойствами, которые коррелируют с этими сообщениями, свойство IsNew не изменяется после обработки первого сообщения
  2. Во втором обработчике я хочу получить доступ ко всем данным, связанным с этими сагами, но я не могу, потому что данные, по-видимому, являются данными, которые были в пересмотре этих сообщений, были отложены.

Соответствующий код:

Конфигурация шины для саги

Bus = Configure.With(Activator)
   .Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
   .Logging(l => l.ColoredConsole())
   .Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
   .Sagas(s => {
       s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
          if (enforceExclusiveAccess)
          {
              s.EnforceExclusiveAccess();
          }
       })
   .Options(o =>
       {
         if (maxDegreeOfParallelism > 0)
         {
            o.SetMaxParallelism(maxDegreeOfParallelism);
         }
         if (maxNumberOfWorkers > 0)
         {
            o.SetNumberOfWorkers(maxNumberOfWorkers);
         }
      })
   .Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
   .Start();

Класс SagaData:

public class RouteListSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();

    public long RoutePlanId { get; set; }

    public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
    public bool SentToLisa { get; set; }

    public void AddShippingActivity(LisaShippingActivity shippingActivity)
    {
        if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
        {
            _shippingActivities.Add(shippingActivity);
        }
    }

    public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}

Метод CorrelateMessages

protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
    config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
    config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}

Дескриптор сообщениякоторые предлагают инициировать сагу и отправить DefferedMessage, если сага IsNew

public async Task Handle(ShippingOrder message)
{
  try
  {
    var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);

    if (Data.ShippingActivities.Contains(lisaActivity))
      return;

    Data.RoutePlanId = message.RoutePlanId;
    Data.AddShippingActivity(lisaActivity);
    var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);

    if (IsNew)
    {
      await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
    }
 }
 catch (Exception err)
 {
   Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
   throw;
 }
}

И, наконец, обработчик для отложенного сообщения:

public Task Handle(VerifyRouteListIsComplete message)
{
  try
  {
    if (!Data.SentToLisa)
    {
      var lisaData = Data.GroupShippingActivitiesToLisaActivities();

      _lisaService.SyncRouteList(lisaData).Wait();

      Data.SentToLisa = true;
    }
    MarkAsComplete();
    return Task.CompletedTask;
  }
  catch (Exception err)
  {
    Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
    _serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
    MarkAsUnchanged();
    return Task.CompletedTask;
  }
}

Любая помощь приветствуется!

1 Ответ

0 голосов
/ 21 мая 2018

Я не уверен, что правильно понимаю симптомы, которые вы испытываете.

Если я отправляю два сообщения «одновременно» первому обработчику, каждое приходит и даже сВ свойствах, которые коррелируют с этими сообщениями, свойство IsNew не изменяется после первого обработанного сообщения

Если вызывается EnforceExclusiveAccess, я ожидаю, что сообщения будут обрабатываться последовательно, первое с IsNew == true и второе с IsNew == false.

Если нет, то я ожидаю, что оба сообщения будут обрабатываться параллельно с IsNew == true, но затем - когда вставляются данные мудреца - я ожидаю одно изчтобы они преуспели, а другой потерпел неудачу с ConcurrencyException.

После ConcurrencyException сообщение было бы обработано снова, на этот раз с IsNew == false.

Разве это нечто вы испытываете?

Во втором обработчике я хочу получить доступ ко всем данным, относящимся к этим сагам, но я не могу, потому что эти данные кажутся такими, какими они были в ревизиииз этих сообщенийas deferred.

Вы говорите, что данные в саге, кажется, находятся в состоянии, в котором они находились, когда сообщение VerifyRouteListIsComplete было отложено?

Звучит действительностранно, а также довольно маловероятно - не могли бы вы попробовать еще раз и посмотреть, действительно ли это так?


ОБНОВЛЕНИЕ: Я выяснил, почему вы испытываете это странное поведение: вы случайно настроили свою сагуЭкземпляр обработчика для повторного использования в сообщениях.

Вы сделали это, зарегистрировав его следующим образом (ВНИМАНИЕ: Не делайте этого!):

_sagaHandler = new ShippingOrderSagaHandler(_subscriber);

_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);

где метод Subscribeзатем делает этот вызов на BuiltinHandlerActivator (ВНИМАНИЕ: не делайте этого!):

activator.Register(() => handlerInstance);

Эта причина, почему это плохо (особенно для обработчика саги), заключается в том, что сам экземпляр обработчика является состоящим- у него есть свойство Data, содержащее текущее состояние процесса, а также свойство IsNew.

То, что вы должны ВСЕГДА делать, - это гарантировать, что каждый раз создается новый экземпляр обработчика.приходит сообщениев - ваш код должен быть изменен на что-то вроде этого:

_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();

, что может быть сделано, если реализация Subscribe изменится на это:

public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
    _activator.Register((bus, context) => getHandler());
    await _activator.Bus.Subscribe<T>();
}

Это решит вашу проблему.проблема монопольного доступа:)

Есть еще одна проблема с вашим кодом: у вас есть потенциальная возможность состязания между регистрацией вашего обработчика и запуском экземпляра абонентской шины, потому что в теории вы можете быть неудачливы и начать получать сообщения между шинойначинается, и вы регистрируете свой обработчик.

Вы должны изменить свой код, чтобы убедиться, что все обработчики зарегистрированы до того, как вы запустите шину (и, следовательно, начнете получать сообщения).

...