Меня действительно раздражает, что мы не можем переместить сообщения с Dead Letter Queue
на Original Queue
для обработки при использовании Azure Servicebus. Итак, я понял, что я постараюсь реализовать эту функцию самостоятельно. Мы используем Masstransit
для публикации sh событий. Имя очереди в ASB будет полным именем сборки событий.
Я создал REST Endpoint
в своем приложении для перемещения сообщений из DLQ в исходную очередь для повторной обработки. Вот где я застрял на данный момент.
Чтобы получить все сообщения в DLQ, пользователь дает мне имя очереди, и я отформатирую его так, чтобы оно содержало DeadLetterQueue. Например:
myproject.events.usercreatedevent -> myproject.events.usercreatedevent/$DeadLetterQueue
Я получаю все сообщения из этой очереди, используя классы из пакета Nuget Microsoft. Azure .Servicebus
public async Task RequeueMessagesAsync(string queueName)
{
var msg = new MessageReceiver(BuildConnectionString(), queueName);
var messages = await msg.PeekAsync(50);
foreach (var message in messages)
{
var content = Encoding.UTF8.GetString(message.Body);
var jsonObject = JsonConvert.DeserializeObject<JObject>(content);
var destinationAddress = jsonObject["destinationAddress"].ToString();
var messageContent = jsonObject["message"].ToString();
var messageType = destinationAddress.Split("/").Last();
await _bus.SendAsync(jsonObject, messageType);
}
}
при вызове _bus.SendAsync(object, address)
сообщение заканчивается на _skipped queue
. Я думаю, что причина этого в том, что messageHeaders
имеет значение JObject
, а не фактический тип сообщения. Я также не могу использовать отражение, чтобы воссоздать событие, так как у нас есть много микросервисов и исходный код события, которое не обязательно доступно. Код, стоящий за _bus.SendAsync(object, address)
, выглядит следующим образом:
public async Task SendAsync(object message, string queueName, CancellationToken cancellationToken = default)
{
ISendEndpoint sender = await GetSenderAsync(queueName);
sender.ConnectSendObserver(new ErrorQueueConfiguration(_addressProvider.GetAddress("error")));
await sender.Send(message, cancellationToken);
}
Могу ли я обмануть Masstransit для пересылки этого «неизвестного» типа моему Потребителю, изменяя каким-либо образом MessageHeaders? Кто-нибудь успешно перенес сообщения из DLQ в исходную очередь?