MassTransit - предотвращает отправку сообщения в пропущенную очередь RMQ при использовании фильтра - PullRequest
0 голосов
/ 16 марта 2020

Я использую MassTransit с RabbitMQ. Следуя примеру на официальной странице для пользовательского промежуточного программного обеспечения, я пытаюсь создать фильтр в конвейере потребления сообщений, который будет отфильтровывать некоторые сообщения на основе определенного условия. Мой фильтр выглядит следующим образом:

public class MyCustomFilter<T> : IFilter<T>
    where T : class, ConsumeContext
{
    public void Probe(ProbeContext context) { }

    public async Task Send(T context, IPipe<T> next)
    {
        if (/* certain condition */)
        {
            await next.Send(context);
        }
    }
}

Проблема заключается в том, что когда сообщение не передается по конвейеру (т. Е. await next.Send(context) не вызывается), сообщение заканчивается в очереди _skipped потребительской RabbitMQ. Есть ли способ предотвратить попадание сообщения в эту очередь?

1 Ответ

2 голосов
/ 16 марта 2020

Очередь skipped (недоставленное письмо) получает сообщение с помощью вызова DeadLetterFilter. Вот код:

async Task IFilter<ReceiveContext>.Send(ReceiveContext context, IPipe<ReceiveContext> next)
{
    await next.Send(context).ConfigureAwait(false);

    if (context.IsDelivered || context.IsFaulted)
        return;

    context.LogSkipped();

    await _deadLetterPipe.Send(context).ConfigureAwait(false);
}

Итак, вы можете себе представить, что если для контекста IsDelivered или IsFaulted установлено значение true, ваши сообщения не попадут в очередь недоставленных сообщений .

Если вы добавите фильтр, ваши сообщения окажутся в очереди «ядовитых» (error), поэтому, я думаю, это не вариант.

Вы можете смоделировать доставку сообщений. сделав что-то подобное для отфильтрованных сообщений в вашем фильтре:

public Task Send(T context, IPipe<T> next)
    => condition
        ? next.Send(context)
        : context.NotifyConsumed(context as ConsumeContext<MyMessage>, TimeSpan.Zero, "Filtered");
...