Azure Servicebus, MassTransit и DLQ. Переход из DLQ в исходную очередь - PullRequest
0 голосов
/ 23 апреля 2020

Меня действительно раздражает, что мы не можем переместить сообщения с Dead Letter Queue на Original Queue для обработки при использовании Azure Servicebus. Итак, я понял, что я постараюсь реализовать эту функцию самостоятельно. Мы используем Masstransit для публикации sh событий. Имя очереди в ASB будет полным именем сборки событий.

Я создал REST Endpoint в своем приложении для перемещения сообщений из DLQ в исходную очередь для повторной обработки. Вот где я застрял на данный момент.

Чтобы получить все сообщения в DLQ, пользователь дает мне имя очереди, и я отформатирую его так, чтобы оно содержало DeadLetterQueue. Например:

myproject.events.usercreatedevent -> myproject.events.usercreatedevent/$DeadLetterQueue

Я получаю все сообщения из этой очереди, используя классы из пакета Nuget Microsoft. Azure .Servicebus

public async Task RequeueMessagesAsync(string queueName)
{
    var msg = new MessageReceiver(BuildConnectionString(), queueName);
    var messages = await msg.PeekAsync(50);

    foreach (var message in messages)
    {
        var content = Encoding.UTF8.GetString(message.Body);
        var jsonObject = JsonConvert.DeserializeObject<JObject>(content);
        var destinationAddress = jsonObject["destinationAddress"].ToString();
        var messageContent = jsonObject["message"].ToString();
        var messageType = destinationAddress.Split("/").Last();

        await _bus.SendAsync(jsonObject, messageType);
    }
}

при вызове _bus.SendAsync(object, address) сообщение заканчивается на _skipped queue. Я думаю, что причина этого в том, что messageHeaders имеет значение JObject, а не фактический тип сообщения. Я также не могу использовать отражение, чтобы воссоздать событие, так как у нас есть много микросервисов и исходный код события, которое не обязательно доступно. Код, стоящий за _bus.SendAsync(object, address), выглядит следующим образом:

public async Task SendAsync(object message, string queueName, CancellationToken cancellationToken = default)
{
    ISendEndpoint sender = await GetSenderAsync(queueName); 
    sender.ConnectSendObserver(new ErrorQueueConfiguration(_addressProvider.GetAddress("error")));

    await sender.Send(message, cancellationToken);
}

Могу ли я обмануть Masstransit для пересылки этого «неизвестного» типа моему Потребителю, изменяя каким-либо образом MessageHeaders? Кто-нибудь успешно перенес сообщения из DLQ в исходную очередь?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...