Использование саги NServicebus для сериализации выполнения длительных обработчиков конечных точек - PullRequest
0 голосов
/ 06 сентября 2018

Мы пытаемся сериализовать обработку списка бизнес-объектов с помощью Saga.

Прямо сейчас, без саги, мы просто перебираем список объектов и запускаем асинхронную bus.Send(new ProcessBusinessObejct(obj)), чтобы выполнялись обработчики. Таким образом, обработка происходит более или менее параллельно, в зависимости от этого параметра, я считаю:

endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );

Это сработало нормально, но количество одновременных обработчиков теперь сильно зависит от базы данных.

Было бы нормально запускать эти обработчики последовательно, то есть переходить к следующему только тогда, когда текущий процесс завершился (не удалось или завершился успешно). Мы не хотим устанавливать параллелизм равным 1, это повлияет на все обработчики в конечной точке.

Идея состоит в том, чтобы использовать шаблон Scatter / Gather и сагу, чтобы отслеживать количество объектов и обновлять конечный автомат с помощью счетчика (общее количество, число неудачных попыток, количество успешных попыток) и, наконец, стрельбы. событие, когда список готов / пуст.

Проблема в

А) Я не уверен, как отслеживать список в саге. SagaData нужен список для хранения всех объектов? Затем удалите экземпляр, когда обработчик сигнализирует о завершении обработки. Сага не поддерживает иерархические данные и, следовательно, не список или список. Я полагаю, что это все еще имеет место в NSB v7.

И Б) Это использование саги осуществимо или излишне, или есть гораздо более простой способ сделать это?

Мы используем Sql Server персистентность и транспорт и NSB 7.

Любой вклад приветствуется!

1 Ответ

0 голосов
/ 07 сентября 2018

Я думаю, вы хотите это сделать. Имейте в виду, что в зависимости от используемого вами уровня персистентности вам может понадобиться отделить фактический импорт от обновления состояния саги. Я писал об этом здесь .

Данные Saga также могут хранить список, но я думаю, что в большинстве сценариев вы можете избежать счета. Еще одно важное замечание (хотя это должно быть очевидно): если сообщение не обрабатывается и не попадает в очередь ошибок (например, неперехваченное исключение в ImportData), вся сага останется незавершенной, пока это сообщение не будет повторено и обработано.

public class MySaga : Saga<MySagaData>
   : IAmStartedByMessages<StartTheProcess>,
     IHandleMessages<ImportData>,
     IHandleMessages<ImportFinished>
{
    public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
    {
        Data.ObjectsToImport = message.ObjectCount;
        Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance

        foreach(var id in message.ObjectIdsToImport)
        {
            await context.SendLocal(new ImportData
            {
                JobID = Data.JobID //You need this to correlate messages back to the saga
                //Anything else you need to pass on to ImportData
                ObjectIdToImport = id
            }
        });
    }

    public async Task Handle(ImportData message, IMessageHandlerContext context)
    {
        //import the data and increment the counter
        var result = ImportData(message.ObjectIdToImport);
        if(result == Result.Success)
        {
            Data.SuccessImport++;
        }
        else
        {
            Data.FailedImport++;
        }

        await CheckIfFinished(context);
    }

    public async Task Handle(ImportFinished message, IMessageHandlerContext context)
    {
        //do any post cleanups or Mark as complete 
        MarkAsComplete();
        return Task.CompletedTask;
    }

    private async Task CheckIfFinished(IMessageHandlerContext context)
    {
        if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
        {
            //Everything is done
            context.SendLocal(new ImportFinished { JobID = Data.JobID });
        }
    }
}
...