Я пытаюсь создать сагу MassTransit, используя AzureServiceBus и .NET Core.
У меня есть приложение ASP.NET Core, которое успешно отправляет сообщения в очередь. Он имеет такую конфигурацию в Startup.cs:
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
}));
EndpointConvention.Map<MassTransit.POC.Shared.IPurchasePolicyMessage>(
new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
и этот код в контроллере, отправляющем сообщение:
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
await
sendEndpoint.Send<IPurchasePolicyMessage>(
new
{
QuoteNumber = policyNumber,
CorrelationId = NewId.NextGuid().ToString("D")
}, context =>
{
context.SetSessionId(context.Message.CorrelationId.ToString());
}).ConfigureAwait(false);
У меня есть отдельное консольное приложение .NET Core, которое получает IPurchasePolicyMessage. Он имеет такую конфигурацию в Program.cs:
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
cfg.ReceiveEndpoint(host, "masstransitqueue", e =>
{
e.RequiresSession = true;
//e.Consumer<PurchasePolicyConsumer>();
e.Saga<PurchasePolicySaga>(new MessageSessionSagaRepository<PurchasePolicySaga>());
});
});
bus.Start();
и PurchasePolicySaga определяется как:
public class PurchasePolicySaga :
ISaga,
InitiatedBy<IPurchasePolicyMessage>
{
public Guid CorrelationId { get; set; }
public async Task Consume(ConsumeContext<IPurchasePolicyMessage> context)
{
await Console.Out.WriteLineAsync($"Processing policy number {context.Message.QuoteNumber} in saga.");
}
}
Метод Потребления здесь никогда не вызывается. Нет ошибок, нет активности, связанной с входящим сообщением в логах, просто ничего не происходит. Может кто-нибудь подскажите пожалуйста, как найти почему?
Я сильно подозреваю, что проблема связана с сессиями, так как, когда я изменяю свою сагу на простого потребителя, удаляю флаги "требовать сеанса" и удаляю очередь, чтобы MassTransit мог воссоздать ее, это работает. Однако, поскольку сеансы требуются для саги MassTransit на основе AzureServiceBus, я немного застрял.
Дальнейшее расследование
Глядя на сообщения на информационной панели Azure, я вижу, что сообщения отправляются в очередь. Этот скриншот показывает 3 сообщения:
Однако по теме нет сообщений:
Я должен ожидать соответствующих сообщений по теме, не так ли? Кроме того, кажется неправильным, что в подписке отключены сеансы?