Сохранять заголовки при повторной доставке сообщения RabbitMq с помощью MassTransit - PullRequest
3 голосов
/ 21 мая 2019

Цель : мне нужно отслеживать заголовки при повторной доставке сообщения.

Конфигурация :

  • RabbitMQ 3.7.9
  • Эрланг 21,2
  • MassTransit 5.1.5
  • MySql 8.0 для базы данных Quartz

То, что я пробовал без успеха:

первая попытка:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
   if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
   {
      sendCtx.Headers.Set("SenderApp", sender);
   }
}).ConfigureAwait(false);

вторая попытка:

protected Task ScheduleSend(Uri rabbitUri, double delay)
{
  return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
    rabbitUri,
    TimeSpan.FromSeconds(delay),
    _Data,
    new HeaderPipe(_SenderApp, 0));
}

public class HeaderPipe : IPipe<SendContext>
{
  private readonly byte   _Priority;
  private readonly string _SenderApp;

  public HeaderPipe (byte priority)
  {
    _Priority  = priority;
    _SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
  }

  public HeaderPipe (string senderApp, byte priority)
  {
    _Priority  = priority;
    _SenderApp = senderApp;
  }

  public void Probe (ProbeContext context)
  { }

  public Task Send (SendContext context)
  {
    context.Headers.Set("SenderApp", _SenderApp);
    context.SetPriority(_Priority);
    return Task.CompletedTask;
  }
}

Ожидается : FinQuest.Robot.DBProcess

Результат : ноль

Я вхожу в метод Consume моего SenderApp. В первый раз это выглядит так

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)

и выглядит так после доставки

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)

Что я делаю не так? Я не хочу использовать функцию повтора из-за ее максимального количества повторов (я не хочу ограничиваться).

Заранее спасибо.

1 Ответ

3 голосов
/ 21 мая 2019

Существует метод, используемый фильтром повторной доставки, который вы можете использовать:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90

public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)

В своем коде вы будете использовать его:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
    sendCtx.TransferConsumeContextHeaders(consumeCtx);
});
...