Использование события saga для реагирования на сообщение, опубликованное пользователем - PullRequest
0 голосов
/ 08 марта 2019

Я собираю подтверждение концепции, используя Mass Transit с RabbitMq и Automatonymous в приложении asp.net core 2.1.Я использую ядро ​​EntityFramework с Postgres для сохранения.

Я пытаюсь запустить сагу, когда делается запрос к http rest api, и вернуть результат, как только сага завершится.Что я делаю:

  • подключите событие, чтобы запустить мою сагу, используя интерфейс с клиентом запроса / ответа
  • , в саге опубликуйте сообщение, которое потребляетсяпотребитель
  • в потребитель публикует сообщение, соответствующее другому событию в моей саге
  • возвращает ответ из моей саги по завершении и завершает

Это мойкод:

мои интерфейсы

public interface IStartSagaRequest
{
    Guid CorrelationId { get; set; }
    string Name {get; set;}
}

public interface IStartSagaResponse
{
    Guid CorrelationId { get; set; }
    bool DidComplete {get; set;}
}

public IDoOperationRequest
{
    Guid CorrelationId { get; set; }
}

public IOperationComplete
{
    Guid CorrelationId { get; set; }
    bool OperationSuccessful {get; set;}
}

Мой экземпляр саги

public class DoOperationSaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public Name { get; set; }
    public string CurrentState { get; set; }
}

конкретная реализация IDoOperationRequest, используемая для публикации в конечном автомате

public class DoOperationRequestImpl : IDoOperationRequest
{
    public Guid CorrelationId { get; set; }
}

конкретная реализация IStartSagaResponse, используемая для публикации в конечном автомате

public class StartSagaResponse : IStartSagaResponse
{
    public Guid CorrelationId { get; set; }
    public bool DidComplete {get; set;}
}

Мой конечный автомат

public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
    public State OperationPending { get; private set; }
    public State Complete { get; private set; }


    public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
    public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }


    public ProcessOperationStateMachine()
    {
        InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);

        Event(() => StartSagaRequestEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                    context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
        });

        Event(() => OperationCompleteEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                context => context.Message.CorrelationId);
        });


        Initially(
            When(StartSagaRequestEvent)
                .Then(context =>
                {
                    context.Instance.CorrelationId = context.Data.CorrelationId;
                    context.Instance.Name = context.Data.Name;
                    context.Publish(new DoOperationRequestImpl
                    {
                        CorrelationId = context.Data.CorrelationId
                    });

                })
                .TransitionTo(OperationPending)
        );

        During(OperationPending,
            When(OperationCompleteEvent)
                .Then(context =>
                {
                    // I'm just doing this for debugging
                    context.Instance.Name = "changed in operationComplete";
                })
                .ThenAsync(context => context.RespondAsync(new StartSagaResponse 
                { 
                    CorrelationId = context.Data.CorrelationId,
                    DidComplete = true
                }))
                .Finalize());

}

Мой потребитель:

public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{

    public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
    {
       await context.Publish<IOperationComplete>(new
       {
          CorrelationId = context.Message.CorrelationId,
          OperationSuccessful = true
       });
    }
}

Как я подключаю вещи вDI в Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    stateMachine = new ProcessOperationStateMachine();

    SagaDbContextFactory factory = new SagaDbContextFactory();
    EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);

    services.AddMassTransit(x =>
    {

        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
        {
            IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            sbc.ReceiveEndpoint(host, "do-operation", ep =>
            {
                ep.UseMessageRetry(c => c.Interval(2, 100));
                ep.StateMachineSaga(stateMachine, repository);
                ep.Durable = false;
            });

            sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
            {
                ep.Consumer(() => new DoOperationRequestConsumer());
                ep.Durable = false;
            });
        }));
        x.AddConsumer<DoOperationRequestConsumer>();
    });

    services.AddScoped<DoOperationRequestConsumer>();

    services.AddScoped(p =>
        p.GetRequiredService<IBusControl>()
            .CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
                new Uri("rabbitmq://localhost/do-operation?durable=false"),
                TimeSpan.FromSeconds(30)));

}

и выполнение запроса в моем контроллере:

public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
    Name = "from the controller",
    CorrelationId = guid
});

Я вижу, что мой конечный автомат запускается.Когда (StartSagaRequestEvent) получает удар и публикуется сообщение DoOperationRequest.DoOperationRequestConsumer получает сообщение и публикует сообщение IOperationComplete.Однако на этом все и заканчивается.Мой IOperationCompleteEvent в моей машине состояний не вызывается.Когда я просматриваю базу данных, я вижу, что мой экземпляр саги создается с помощью guid, а для CurrentState установлено значение OperationPending.Когда я смотрю на свою панель управления rabbitmq, я вижу сообщение, опубликованное после того, как мой DoOperationRequestConsumer выполнил публикацию сообщения IOperationComplete.Я просто не вижу, чтобы конечный автомат потреблял сообщение IOperationComplete, опубликованное потребителем.Когда я устанавливаю точку останова и проверяю сообщение в Consumer, я вижу, что CorrelationId установлен на то же значение CorrelationId саги.

Я также пытался явно использовать очередь "do-operation" в получателе:

public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
    ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));

    await sendEndpoint.Send<IOperationComplete>(new
    {
      CorrelationId = context.Message.CorrelationId,
      OperationSuccessful = true
    });
}

, но все еще не смог установить соединение.

Я весь день бился об это и не уверен, что мне здесь не хватает.Если бы кто-нибудь мог дать мне несколько советов о том, что я могу делать неправильно, я был бы очень признателен, опять же извините за стену текста, я знаю, что это достаточно для чтения, но я хотел уточнить, что я делал.Большое спасибо!

1 Ответ

2 голосов
/ 09 марта 2019

Ваш корреляционный идентификатор события кажется подозрительным, он должен выглядеть следующим образом:

Event(() => StartSagaRequestEvent, eventConfigurator =>
{
    eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
        .SelectId(context => context.Message.CorrelationId);
});

Таким образом, он инициализируется в CorrelationId сообщения.

Не связано, но ваша конечная точка должна использовать метод расширения для вашего контейнера:

sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
    ep.ConfigureConsumer<DoOperationRequestConsumer>();
    ep.Durable = false;
});

И используйте новый клиентский запрос, настроив его также в расширениях.

x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));

Кроме того, в вашем начальном состоянии эта строка должна быть удалена:

context.Instance.CorrelationId = context.Data.CorrelationId;
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...