MassTransit store Сообщение повторных попыток в памяти? - PullRequest
0 голосов
/ 09 ноября 2019

Политика MassTransit Retry позволяет нам устанавливать попытки повторных попыток. На потребительском уровне, мы можем получить его, вызвав

context.GetRetryAttempt ()

, но когда потребительское приложение перезапускается, оно начинается с 0. И мне нужно, чтобы оно запускалось с того места, где оно было до этого. сервер останавливается.

Поскольку обмен сообщениями RabbitMQ помогает мне достичь этого. Пример кода прилагается здесь. Могу ли я сделать подобное в MassTransit?

public class Consumer<T> : IConsumer<T>
{
    private readonly ConsumerConfigOptions _consumerConfigOptions;
    private readonly RabbitMqPoc.Interface.IConnectionFactory _connectionFactory;
    public event ReceiveMessage<T> ReceiveMessageEvent;

    private AckType AckType = AckType.Accept;
    private bool IsRequeue = false;
    private const string RetryExchange = "RetryExchange";
    private const string RetryKeyName = "x-retries";

    public Consumer(ConsumerConfigOptions consumerConfigOptions,
        RabbitMqPoc.Interface.IConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
        _consumerConfigOptions = consumerConfigOptions;
    }

    public void Consume(string queue)
    {
        IModel channel = _connectionFactory.GetConnection().CreateModel();

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, ea) =>
        {
            var header = ea.BasicProperties.Headers;
            byte[] body = ea.Body;

            try
            {
                //throw exception here
                CheckForAcknowledge(channel,ea);
            }
            catch (Exception exception)
            {
                Requeue(channel, ea, body);
            }
        };
        channel.BasicConsume(queue: queue,
                         autoAck: false,
                         consumer: consumer);
    }

    private void Requeue(IModel channel, BasicDeliverEventArgs ea, byte[] body)
    {
        var header = ea.BasicProperties.Headers;

        if (header.ContainsKey(RetryKeyName))
        {
            SetRetryCount(header, channel, ea, body);
        }
        else
        {
            RePublishOnDlx(channel, ea, body, 0);
        }
    }

    private void SetRetryCount(IDictionary<string, object> header, IModel channel, BasicDeliverEventArgs ea, byte[] body)
    {
        int currentRetryAttempt = (int)header[RetryKeyName] + 1;

        if (currentRetryAttempt >= _consumerConfigOptions.RetryAttempts)
        {
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        }
        else
        {
            RePublishOnDlx(channel, ea, body, currentRetryAttempt);
        }
    }

    private void RePublishOnDlx(IModel channel, BasicDeliverEventArgs eventArgs, byte[] body, int retryAttempt)
    {
        IBasicProperties basicProperties = channel.CreateBasicProperties();

        basicProperties.Headers = new Dictionary<string, object>
        {
            { RetryKeyName, retryAttempt }
        };

        channel.BasicNack(eventArgs.DeliveryTag, false, false);
        channel.BasicPublish(RetryExchange, string.Empty, basicProperties, body);
    }

    public void Acknowledge(AckType ackType = AckType.Accept, bool isRequeue = false)
    {
        AckType = ackType;
        IsRequeue = isRequeue;
    }

    private void CheckForAcknowledge(IModel channel, BasicDeliverEventArgs ea)
    {
        if (AckType == AckType.Accept)
        {
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        }
        else
        {
            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: IsRequeue);
        }
    }

1 Ответ

1 голос
/ 09 ноября 2019

Вы правы, UseMessageRetry в MassTransit полностью в памяти. Если повторные попытки исчерпаны, сообщение будет перемещено в очередь _error . Если процесс завершится во время ожидания повторных попыток, сообщение будет заблокировано и останется в очереди. Когда это происходит, и процесс перезапускается, вы правы, политика повторных попыток начинается с нуля - поскольку исходное сообщение все еще находится в очереди и никакие заголовки не могут быть изменены.

Если вы хотите, вы можете пересылать сообщенияс плагином RabbitMQ с отложенным обменом для повторных попыток с использованием брокера. Фильтр UseScheduledRedelivery сделает это после того, как вы настроили подключаемый модуль отложенного обмена и планировщик сообщений на своей шине.

cfg.UseDelayedExchangeMessageScheduler();
...