Цель : мне нужно отслеживать заголовки при повторной доставке сообщения.
Конфигурация :
- RabbitMQ 3.7.9
- Эрланг 21,2
- MassTransit 5.1.5
- MySql 8.0 для базы данных Quartz
То, что я пробовал без успеха:
первая попытка:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
{
sendCtx.Headers.Set("SenderApp", sender);
}
}).ConfigureAwait(false);
вторая попытка:
protected Task ScheduleSend(Uri rabbitUri, double delay)
{
return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
rabbitUri,
TimeSpan.FromSeconds(delay),
_Data,
new HeaderPipe(_SenderApp, 0));
}
public class HeaderPipe : IPipe<SendContext>
{
private readonly byte _Priority;
private readonly string _SenderApp;
public HeaderPipe (byte priority)
{
_Priority = priority;
_SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
}
public HeaderPipe (string senderApp, byte priority)
{
_Priority = priority;
_SenderApp = senderApp;
}
public void Probe (ProbeContext context)
{ }
public Task Send (SendContext context)
{
context.Headers.Set("SenderApp", _SenderApp);
context.SetPriority(_Priority);
return Task.CompletedTask;
}
}
Ожидается : FinQuest.Robot.DBProcess
Результат : ноль
Я вхожу в метод Consume моего SenderApp. В первый раз это выглядит так
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)
и выглядит так после доставки
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)
Что я делаю не так? Я не хочу использовать функцию повтора из-за ее максимального количества повторов (я не хочу ограничиваться).
Заранее спасибо.