EasyNetQ - Как повторить неудачные сообщения и сохранить RetryCount в теле / ​​заголовке сообщения? - PullRequest
0 голосов
/ 14 февраля 2019

Я использую EasyNetQ и мне нужно повторить неудачные сообщения в исходной очереди.Проблема заключается в том, что, хотя я успешно увеличиваю переменную TriedCount (в теле каждой msg), когда EasyNetQ публикует сообщение в очередь ошибок по умолчанию после исключения, обновленный TriedCount отсутствует в msg!Предположительно потому, что он просто сбрасывает исходное сообщение в очередь ошибок без изменений потребителя.

Обновленный TriedCount работает для повторных публикаций в процессе, но не при повторной публикации через EasyNetQ Hosepipe или EasyNetQ Management Client.Текстовые файлы Сгенерированные шланги не имеют обновленного TriedCount.

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

public class TryInfo
{   
    public int TriedCount { get; set; }

    /*Other information regarding msg attempt*/
}

public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
    IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
    // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
    // The mediator can transmit the retried msg or choose to ignore it
    return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}

Я также пытался переиздать себя через Management API (грубый код):

var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue", 
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, 
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, 
crit).Result;

foreach (var errMsg in errMsgs)
{
    var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
                                 new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
        }

Это работает, но публикуется только в очередь ошибок снова, а не в исходной очереди.Кроме того, я не знаю, как добавить / обновить информацию о повторных попытках в теле сообщения на этом этапе.

Я исследовал эту библиотеку, чтобы добавить заголовки к сообщению, но яне вижу, если счет в теле не обновляется, как / почему счет в заголовке должен быть обновлен.

Есть ли способ сохранить TriedCount без обращения к шине Advanced (в которойВ случае, если я мог бы использовать сам клиент RabbitMQ .Net)?

1 Ответ

0 голосов
/ 14 марта 2019

На всякий случай, если это кому-то поможет, я в итоге реализовал свой собственный IErrorMessageSerializer (в отличие от реализации всего IConsumerErrorStrategy , который выглядел как избыточное убийство).Причина, по которой я добавляю информацию о повторных попытках в тело (вместо заголовка), заключается в том, что EasyNetQ не обрабатывает сложные типы в заголовке (во всяком случае, не из коробки).Таким образом, использование словаря дает больше контроля для разных потребителей.Я регистрирую пользовательский сериализатор во время создания шины следующим образом:

_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));

И просто реализовал метод Serialize следующим образом:

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

Фактическая повторная попытка выполняется простойКонсольное приложение / служба Windows периодически через API-интерфейс управления EasyNetQ:

            var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
            var vhost = client.GetVhostAsync("/").Result;
            var aliveRes = client.IsAliveAsync(vhost).Result;
            var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
            var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
            var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
            foreach (var errMsg in errMsgs)
            {
                var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
                var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
                pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
                pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
                pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
                var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
                     pubInfo).Result;
            }

Независимо от того, включен ли повтор или нет, сам мой потребитель знает, что дает ему больше контроля, позволяя ему выбирать обработанное сообщение или просто игнорироватьЭто.После игнорирования сообщение, очевидно, больше не будет пробоваться;Вот как работает EasyNetQ.

...