Я использую EasyNetQ и мне нужно повторить неудачные сообщения в исходной очереди.Проблема заключается в том, что, хотя я успешно увеличиваю переменную TriedCount (в теле каждой msg), когда EasyNetQ публикует сообщение в очередь ошибок по умолчанию после исключения, обновленный TriedCount отсутствует в msg!Предположительно потому, что он просто сбрасывает исходное сообщение в очередь ошибок без изменений потребителя.
Обновленный TriedCount работает для повторных публикаций в процессе, но не при повторной публикации через EasyNetQ Hosepipe или EasyNetQ Management Client.Текстовые файлы Сгенерированные шланги не имеют обновленного TriedCount.
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
public class TryInfo
{
public int TriedCount { get; set; }
/*Other information regarding msg attempt*/
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
Я также пытался переиздать себя через Management API (грубый код):
var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue",
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue,
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue,
crit).Result;
foreach (var errMsg in errMsgs)
{
var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
}
Это работает, но публикуется только в очередь ошибок снова, а не в исходной очереди.Кроме того, я не знаю, как добавить / обновить информацию о повторных попытках в теле сообщения на этом этапе.
Я исследовал эту библиотеку, чтобы добавить заголовки к сообщению, но яне вижу, если счет в теле не обновляется, как / почему счет в заголовке должен быть обновлен.
Есть ли способ сохранить TriedCount без обращения к шине Advanced (в которойВ случае, если я мог бы использовать сам клиент RabbitMQ .Net)?