Я пытаюсь отправить запланированное сообщение, используя UseDelayedExchangeMessageScheduler
вместе с плагином rabbitmq_delayed_message_exchange
. Я настраиваю шину так:
public void StartUpBus()
{
_bus = Bus.Factory.CreateUsingRabbitMq(ConfigureBus);
_bus.Start();
}
private void ConfigureBus(IRabbitMqBusFactoryConfigurator busConfigurator)
{
var host = busConfigurator.Host(new Uri(_connectionInfo.ConnectionString), h =>
{
h.Username(_connectionInfo.User);
h.Password(_connectionInfo.Password);
});
busConfigurator.UseDelayedExchangeMessageScheduler();
busConfigurator.ReceiveEndpoint(host, "schedule-send-endpoint.inbox", endpoint => {
endpoint.PrefetchCount = 1;
endpoint.Consumer( () => new AScheduledConsumer() );
});
}
Потребитель прост, как этот
public class AScheduledConsumer : IConsumer<AScheduledMessage>
{
public Task Consume(ConsumeContext<AScheduledMessage> context)
{
return Console.Out.WriteLineAsync($"Message received at {DateTime.Now}");
}
}
Вот сообщение, которое я хотел бы отправить
// Simple role interface to easily identify bus travelling data
public interface IMessage
{
}
public class AScheduledMessage : IMessage
{
}
Затем я пытаюсь отправить сообщение с чем-то вроде этого
var message = new AScheduledMessage();
var delay = TimeSpan.FromMinutes(1);
Uri destinationUri = new Uri("rabbitmq://localhost/schedule-send-endpoint.inbox");
await Console.Out.WriteLineAsync($"Message sent at {DateTime.Now}");
return _bus.ScheduleSend(
destinationUri,
delay,
message
);
Я ожидал, что сообщение будет доставлено примерно через минуту после отправки, но сообщение заканчивается в очереди _skipped. Вот после сообщения, которое выводится функцией «Получить сообщение» в пользовательском интерфейсе управления RabbitMq.
The server reported 0 messages remaining.
Exchange schedule-send-endpoint.inbox_skipped
Routing Key
Redelivered ○
Properties
message_id: d7040000-4392-98e7-c8e1-08d7e3d61e54
correlation_id: d7040000-4392-98e7-cc70-08d7e3d61e49
delivery_mode: 2
headers:
Content-Type: application/vnd.masstransit+json
MT-Host-Assembly: Infrastructure.Messaging.RabbitMq.Test.ConsoleApp
MT-Host-AssemblyVersion: 1.0.0.0
MT-Host-FrameworkVersion: 4.0.30319.42000
MT-Host-MachineName: GABROS-NB
MT-Host-MassTransitVersion: 5.2.1.1808
MT-Host-OperatingSystemVersion: Microsoft Windows NT 6.2.9200.0
MT-Host-ProcessId: 26984
MT-Host-ProcessName: Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost
MT-Reason: dead-letter
infrastructure.correlation-id: 029ea5c6-e5ee-44b7-8851-84d3b6ebd191
infrastructure.user-id: anonymous
publishId: 1
content_type: application/vnd.masstransit+json
Payload
1649 bytes
Encoding: string
{
"messageId": "d7040000-4392-98e7-c8e1-08d7e3d61e54",
"correlationId": "d7040000-4392-98e7-cc70-08d7e3d61e49",
"conversationId": "d7040000-4392-98e7-29ed-08d7e3d61e5d",
"sourceAddress": "rabbitmq://localhost/bus-GABROS-NB-Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost-4hnyyynd1kcqqhmibdm68io7fu?durable=false&autodelete=true",
"destinationAddress": "rabbitmq://localhost/schedule-send-endpoint.inbox",
"messageType": [
"urn:message:MassTransit.Scheduling:ScheduleMessage[[Infrastructure.Messaging.Test:AScheduledMessage]]",
"urn:message:MassTransit.Scheduling:ScheduleMessage"
],
"message": {
"correlationId": "d7040000-4392-98e7-cc70-08d7e3d61e49",
"scheduledTime": "2020-04-18T20:21:47.1796308Z",
"payloadType": [
"urn:message:Infrastructure.Messaging.Test:AScheduledMessage",
"urn:message:Infrastructure.Messaging:IMessage"
],
"destination": "rabbitmq://localhost/schedule-send-endpoint.inbox",
"payload": {}
},
"sentTime": "2020-04-18T20:21:46.8178828Z",
"headers": { },
"host": {
"machineName": "GABROS-NB",
"processName": "Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost",
"processId": 26984,
"assembly": "Infrastructure.Messaging.RabbitMq.Test.ConsoleApp",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "4.0.30319.42000",
"massTransitVersion": "5.2.1.1808",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
}
}
И это обмены, которые я нахожу в RabbitMQ
Name Type Features Message rate in Message rate out+/-
(AMQP default) direct D
Infrastructure.Messaging.Test:AScheduledMessage fanout D
amq.direct direct D
amq.fanout fanout D
amq.headers headers D
amq.match headers D
amq.rabbitmq.trace topic D I
amq.topic topic D
bus-GABROS-NB-Infrastructure.Messaging.RabbitMq.Test.ConsoleApp.vshost-4hnyyynd1kcqqysnbdm6jy77ny fanout AD
schedule-send-endpoint.inbox fanout D 0.00/s 0.00/s
schedule-send-endpoint.inbox_skipped fanout D 0.00/s 0.00/s
Что Я нахожу странным полное отсутствие заголовка x-delay в сообщении и отсутствие Exchange-сообщения x-delayed-сообщения, созданного в rabbitMq, как если бы UseDelayedExchangeMessageScheduler
был полностью проигнорирован ... Я думаю, что я делаю что-то не так, но действительно не могу найти виновного!
--- ОБНОВЛЕНИЕ ---
Как указал @ChrisPatterson, сообщение должно быть отправлено с MessageScheduler
,
Хотя решение MassTransit 5.2.1 пока не найдено, этот код работает с использованием MassTransit v6.4.2 и выполняет tnet core 2:
var message = new AScheduledMessage();
var delay = TimeSpan.FromMinutes(1);
Uri destinationUri = new Uri("rabbitmq://localhost/schedule-send-endpoint.inbox");
var ms = new MessageScheduler(new DelayedExchangeScheduleMessageProvider(bus, bus.Topology as IRabbitMqHostTopology))
await Console.Out.WriteLineAsync($"Message sent at {DateTime.Now}");
return ms.ScheduleSend(
destinationUri,
delay,
message
);