MassTransit: при использовании UseDelayedExchangeMessageScheduler сообщения, отправленные с ScheduleSend, попадают в пропущенную очередь - PullRequest
1 голос
/ 19 апреля 2020

Я пытаюсь отправить запланированное сообщение, используя UseDelayedExchangeMessageScheduler вместе с плагином rabbitmq_delayed_message_exchange. Я настраиваю шину так:

    public void StartUpBus()
    {
        _bus = Bus.Factory.CreateUsingRabbitMq(ConfigureBus);
        _bus.Start();
    }

    private void ConfigureBus(IRabbitMqBusFactoryConfigurator busConfigurator)
    {
        var host = busConfigurator.Host(new Uri(_connectionInfo.ConnectionString), h =>
            {
                h.Username(_connectionInfo.User);
                h.Password(_connectionInfo.Password);
            });

        busConfigurator.UseDelayedExchangeMessageScheduler();

        busConfigurator.ReceiveEndpoint(host, "schedule-send-endpoint.inbox", endpoint => {

            endpoint.PrefetchCount = 1;

            endpoint.Consumer( () => new AScheduledConsumer() );
        });

    }

Потребитель прост, как этот

public class AScheduledConsumer : IConsumer<AScheduledMessage>
{
    public Task Consume(ConsumeContext<AScheduledMessage> context)
    {
        return Console.Out.WriteLineAsync($"Message received at {DateTime.Now}");
    }
}

Вот сообщение, которое я хотел бы отправить

// Simple role interface to easily identify bus travelling data
public interface IMessage 
{
}

public class AScheduledMessage : IMessage
{
}

Затем я пытаюсь отправить сообщение с чем-то вроде этого

        var message = new AScheduledMessage();
        var delay = TimeSpan.FromMinutes(1);
        Uri destinationUri = new Uri("rabbitmq://localhost/schedule-send-endpoint.inbox");
        await Console.Out.WriteLineAsync($"Message sent at {DateTime.Now}");
        return _bus.ScheduleSend(
            destinationUri,
            delay,
            message
        );

Я ожидал, что сообщение будет доставлено примерно через минуту после отправки, но сообщение заканчивается в очереди _skipped. Вот после сообщения, которое выводится функцией «Получить сообщение» в пользовательском интерфейсе управления RabbitMq.

The server reported 0 messages remaining.

Exchange        schedule-send-endpoint.inbox_skipped
Routing Key 
Redelivered     ○
Properties  
message_id:     d7040000-4392-98e7-c8e1-08d7e3d61e54
correlation_id: d7040000-4392-98e7-cc70-08d7e3d61e49
delivery_mode:  2
headers:    
Content-Type:       application/vnd.masstransit+json
MT-Host-Assembly:   Infrastructure.Messaging.RabbitMq.Test.ConsoleApp
MT-Host-AssemblyVersion:    1.0.0.0
MT-Host-FrameworkVersion:   4.0.30319.42000
MT-Host-MachineName:        GABROS-NB
MT-Host-MassTransitVersion: 5.2.1.1808
MT-Host-OperatingSystemVersion: Microsoft Windows NT 6.2.9200.0
MT-Host-ProcessId:      26984
MT-Host-ProcessName:    Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost
MT-Reason:              dead-letter
infrastructure.correlation-id:  029ea5c6-e5ee-44b7-8851-84d3b6ebd191
infrastructure.user-id: anonymous
publishId:  1
content_type:   application/vnd.masstransit+json
Payload
1649 bytes
Encoding: string
{
  "messageId": "d7040000-4392-98e7-c8e1-08d7e3d61e54",
  "correlationId": "d7040000-4392-98e7-cc70-08d7e3d61e49",
  "conversationId": "d7040000-4392-98e7-29ed-08d7e3d61e5d",
  "sourceAddress": "rabbitmq://localhost/bus-GABROS-NB-Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost-4hnyyynd1kcqqhmibdm68io7fu?durable=false&autodelete=true",
  "destinationAddress": "rabbitmq://localhost/schedule-send-endpoint.inbox",
  "messageType": [
    "urn:message:MassTransit.Scheduling:ScheduleMessage[[Infrastructure.Messaging.Test:AScheduledMessage]]",
    "urn:message:MassTransit.Scheduling:ScheduleMessage"
  ],
  "message": {
    "correlationId": "d7040000-4392-98e7-cc70-08d7e3d61e49",
    "scheduledTime": "2020-04-18T20:21:47.1796308Z",
    "payloadType": [
      "urn:message:Infrastructure.Messaging.Test:AScheduledMessage",
      "urn:message:Infrastructure.Messaging:IMessage"
    ],
    "destination": "rabbitmq://localhost/schedule-send-endpoint.inbox",
    "payload": {}
  },
  "sentTime": "2020-04-18T20:21:46.8178828Z",
  "headers": { },
  "host": {
    "machineName": "GABROS-NB",
    "processName": "Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost",
    "processId": 26984,
    "assembly": "Infrastructure.Messaging.RabbitMq.Test.ConsoleApp",
    "assemblyVersion": "1.0.0.0",
    "frameworkVersion": "4.0.30319.42000",
    "massTransitVersion": "5.2.1.1808",
    "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
  }
}

И это обмены, которые я нахожу в RabbitMQ

Name                                                                                                Type    Features    Message rate in Message rate out+/-
(AMQP default)                                                                                      direct  D       
Infrastructure.Messaging.Test:AScheduledMessage                                                     fanout  D       
amq.direct                                                                                          direct  D       
amq.fanout                                                                                          fanout  D       
amq.headers                                                                                         headers D       
amq.match                                                                                           headers D       
amq.rabbitmq.trace                                                                                  topic   D I     
amq.topic                                                                                           topic   D       
bus-GABROS-NB-Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost-4hnyyynd1kcqqysnbdm6jy77ny   fanout  AD      
schedule-send-endpoint.inbox                                                                        fanout  D           0.00/s          0.00/s
schedule-send-endpoint.inbox_skipped                                                                fanout  D           0.00/s          0.00/s

Что Я нахожу странным полное отсутствие заголовка x-delay в сообщении и отсутствие Exchange-сообщения x-delayed-сообщения, созданного в rabbitMq, как если бы UseDelayedExchangeMessageScheduler был полностью проигнорирован ... Я думаю, что я делаю что-то не так, но действительно не могу найти виновного!

--- ОБНОВЛЕНИЕ ---

Как указал @ChrisPatterson, сообщение должно быть отправлено с MessageScheduler ,

Хотя решение MassTransit 5.2.1 пока не найдено, этот код работает с использованием MassTransit v6.4.2 и выполняет tnet core 2:

var message = new AScheduledMessage();
var delay = TimeSpan.FromMinutes(1);
Uri destinationUri = new Uri("rabbitmq://localhost/schedule-send-endpoint.inbox");

var ms = new MessageScheduler(new DelayedExchangeScheduleMessageProvider(bus, bus.Topology as IRabbitMqHostTopology))

await Console.Out.WriteLineAsync($"Message sent at {DateTime.Now}");
return ms.ScheduleSend(
    destinationUri,
    delay,
    message
);

1 Ответ

1 голос
/ 19 апреля 2020

Если вы планируете сообщения с шины, а не внутри потребителя, вам нужно использовать класс планировщика сообщений. В приведенном выше примере вы видите, что он отправляет адрес получателя, как если бы это был Quartz, но это не так. Чтобы использовать отложенный обмен за пределами ConsumeContext, необходимо создать класс MessageScheduler и передать его поставщику планировщика отложенного обмена RabbitMQ.

Не очевидно, я обновлю документацию и попытаюсь сделать это проще для планирования с шины с помощью не-кварцевых планировщиков.

Я также добавил новый метод, CreateRabbitMqMessageScheduler, который является методом расширения на IBus, который в основном создает необходимые компоненты для планирования с использованием отложенный обмен:

 if (bus.Topology is IRabbitMqHostTopology topology)
     return new MessageScheduler(new DelayedExchangeScheduleMessageProvider(bus, topology));

Вот что сделано под капотом.

...