Мы следовали этому примеру (http://masstransit -project.com / MassTransit / использовании / azure-functions.html ), чтобы попытаться настроить функции Azure в качестве подписчиков события (раздела) служебной шины Azure с помощью MassTransit(для .Net CORE 2.1, Функции Azure 2.0).
При использовании веб-заданий Azure это так же просто, как с использованием RabbitMQ, настройте издателя, разрешите подписчику настраивать и настраивать свою очередь, а Masstransit автоматически создает ее.тема для каждого события, перенаправление в очередь и в «queue_error» после неудачной попытки.Вам не нужно ничего настраивать вручную.
Но с помощью функций Azure мы, кажется, вручную (через Service Bus Explorer или шаблоны ARM) должны добавить подписчиков в тему (которая создается издателем при первом опубликованном событии) и очереди также(хотя они даже не кажутся необходимыми, события обрабатываются непосредственно подписчиками темы-функции Azure.).
Возможно, мы делаем что-то не так, из документов я не вижу, что MT не будет, как обычно, настраивать подписчика и создавать очереди при использовании функций Azure.Но это работает, за исключением случаев, когда потребитель генерирует исключение и после того, как все попытки установки были выполнены.Мы просто не получаем событие в очереди недоставленного сообщения, и обычно генерируемая MT очередь ошибок даже не генерируется.
Итак, как мы можем получить MT для создания очередей ошибок и ПЕРЕДАЧИ неудачных событий там?
Наш код:
[FunctionName("OrderShippedConsumer")]
public static Task OrderShippedConsumer(
[ServiceBusTrigger("xyz.events.order/iordershipped", "ordershippedconsumer-queue", Connection = "AzureServiceBus")] Message message,
IBinder binder,
ILogger logger,
CancellationToken cancellationToken,
ExecutionContext context)
{
var config = CreateConfig(context);
var handler = Bus.Factory.CreateBrokeredMessageReceiver(binder, cfg =>
{
var serviceBusEndpoint = Parse.ConnectionString(config["AzureServiceBus"])["Endpoint"];
cfg.CancellationToken = cancellationToken;
cfg.SetLog(logger);
cfg.InputAddress = new Uri($"{serviceBusEndpoint}{QueueName}");
cfg.UseRetry(x => x.Intervals(TimeSpan.FromSeconds(5)));
cfg.Consumer(() => new OrderShippedConsumer(cfg.Log, config));
});
return handler.Handle(message);
}
И код потребителя:
public OrderShippedConsumer(ILog log, IConfigurationRoot config)
{
this.config = config;
this.log = log;
}
public async Task Consume(ConsumeContext<IOrderShipped> context)
{
// Handle the event
}
}