Сага MassTransit, использующая AzureServiceBus, молча терпит неудачу - PullRequest
0 голосов
/ 14 января 2019

Я пытаюсь создать сагу MassTransit, используя AzureServiceBus и .NET Core.

У меня есть приложение ASP.NET Core, которое успешно отправляет сообщения в очередь. Он имеет такую ​​конфигурацию в Startup.cs:

services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    cfg.Host(
        new Uri("https://zzz.servicebus.windows.net/"),
        h =>
        {
            h.TransportType = TransportType.AmqpWebSockets;
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.RetryLimit = 1;
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
        });

    cfg.RequiresSession = true;
}));

EndpointConvention.Map<MassTransit.POC.Shared.IPurchasePolicyMessage>(
    new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));

services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());

и этот код в контроллере, отправляющем сообщение:

var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));

await
  sendEndpoint.Send<IPurchasePolicyMessage>(
    new
    {
      QuoteNumber = policyNumber,
      CorrelationId = NewId.NextGuid().ToString("D")
    }, context =>
    {
      context.SetSessionId(context.Message.CorrelationId.ToString());
    }).ConfigureAwait(false);

У меня есть отдельное консольное приложение .NET Core, которое получает IPurchasePolicyMessage. Он имеет такую ​​конфигурацию в Program.cs:

var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    var host = cfg.Host(
        new Uri("https://zzz.servicebus.windows.net/"),
        h =>
        {
            h.TransportType = TransportType.AmqpWebSockets;
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.RetryLimit = 1;
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
        });

    cfg.RequiresSession = true;

    cfg.ReceiveEndpoint(host, "masstransitqueue", e =>
    {
        e.RequiresSession = true;
        //e.Consumer<PurchasePolicyConsumer>();
        e.Saga<PurchasePolicySaga>(new MessageSessionSagaRepository<PurchasePolicySaga>());
    });
});

bus.Start();

и PurchasePolicySaga определяется как:

public class PurchasePolicySaga :
    ISaga,
    InitiatedBy<IPurchasePolicyMessage>
{
    public Guid CorrelationId { get; set; }

    public async Task Consume(ConsumeContext<IPurchasePolicyMessage> context)
    {
        await Console.Out.WriteLineAsync($"Processing policy number {context.Message.QuoteNumber} in saga.");
    }
}

Метод Потребления здесь никогда не вызывается. Нет ошибок, нет активности, связанной с входящим сообщением в логах, просто ничего не происходит. Может кто-нибудь подскажите пожалуйста, как найти почему?

Я сильно подозреваю, что проблема связана с сессиями, так как, когда я изменяю свою сагу на простого потребителя, удаляю флаги "требовать сеанса" и удаляю очередь, чтобы MassTransit мог воссоздать ее, это работает. Однако, поскольку сеансы требуются для саги MassTransit на основе AzureServiceBus, я немного застрял.

Дальнейшее расследование

Глядя на сообщения на информационной панели Azure, я вижу, что сообщения отправляются в очередь. Этот скриншот показывает 3 сообщения:

azure queue screenshot

Однако по теме нет сообщений:

azure topic screenshot

Я должен ожидать соответствующих сообщений по теме, не так ли? Кроме того, кажется неправильным, что в подписке отключены сеансы?

1 Ответ

0 голосов
/ 15 января 2019

Чтобы использовать сеансы с Azure Service Bus, необходимо установить SessionId отправленного / опубликованного сообщения, чтобы Azure создал для него сеанс.

await InputQueueSendEndpoint.Send(message, context =>
{
    context.SetSessionId(message.CorrelationId.ToString());
});

Вы можете увидеть пример теста работающего модуля здесь:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.AzureServiceBusTransport.Tests/Session_Specs.cs

...