NServiceBus с AzureStorageQueues не удаляет подозрительные сообщения из входной очереди из-за изменения свойств сообщения - PullRequest
0 голосов
/ 20 апреля 2020

Я экспериментирую с новым проектом NServiceBus, использующим Azure Очереди хранения для транспортировки сообщений и JSON сериализацию с использованием настраиваемых журналов развертывания сообщений c, которые можно увидеть здесь:

            var jsonSerializer = new Newtonsoft.Json.JsonSerializer();
            transportExtensions.UnwrapMessagesWith(cloudQueueMessage =>
            {
                using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
                using (var streamReader = new StreamReader(stream))
                using (var textReader = new JsonTextReader(streamReader))
                {
                    try
                    {
                        var jObject = JObject.Load(textReader);

                        using (var jsonReader = jObject.CreateReader())
                        {
                            // Try deserialize to a NServiceBus envelope first
                            var wrapper = jsonSerializer.Deserialize<MessageWrapper>(jsonReader);

                            if (wrapper.MessageIntent != default)
                            {
                                // This was a envelope message
                                return wrapper;
                            }
                        }

                        // Otherwise this was an EventGrid event
                        using (var jsonReader = jObject.CreateReader())
                        {
                            var @event = jsonSerializer.Deserialize<EventGridEvent>(jsonReader);

                            var wrapper = new MessageWrapper
                            {
                                Id = @event.Id,
                                Headers = new Dictionary<string, string>
                            {
                                { "NServiceBus.EnclosedMessageTypes", @event.EventType },
                                { "NServiceBus.MessageIntent", "Publish" },
                                { "EventGrid.topic", @event.Topic },
                                { "EventGrid.subject", @event.Subject },
                                { "EventGrid.eventTime", @event.EventTime.ToString("u") },
                                { "EventGrid.dataVersion", @event.DataVersion },
                                { "EventGrid.metadataVersion", @event.MetadataVersion },
                            },
                                Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.Data)),
                                MessageIntent = MessageIntentEnum.Publish
                            };

                            return wrapper;
                        }
                    }
                    catch
                    {
                        logger.Error("Message deserialization failed, sending message to error queue");
                        throw;
                    }
                }
            });

Настраиваемые журналы развертывания сообщений c работает правильно для правильно отформатированных JSON сообщений, и когда неправильно отформатированное сообщение JSON помещается во входную очередь, пользовательское сообщение развертывания logi c выдает ошибку в первой строке внутри использования, где я создаю jObject что является ожидаемым поведением. Однако, когда пользовательское развертывание сообщения logi c завершается неудачей, ошибка будет поймана logi c в классе MessageRetrieved, который является частью пакета NServiceBus. Azure .Transports.WindowsAzureStorageQueues NuGet (v8.2.0), показанного ниже :

        public async Task<MessageWrapper> Unwrap()
        {
            try
            {
                Logger.DebugFormat("Unwrapping message with native ID: '{0}'", rawMessage.Id);
                return unwrapper.Unwrap(rawMessage);
            }
            catch (Exception ex)
            {
                await errorQueue.AddMessageAsync(rawMessage).ConfigureAwait(false);
                await inputQueue.DeleteMessageAsync(rawMessage).ConfigureAwait(false);

                throw new SerializationException($"Failed to deserialize message envelope for message with id {rawMessage.Id}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
            }
        }

Первая строка попытки перехвата выполняется правильно, добавляя сообщение в сконфигурированную очередь ошибок, однако при этом он, похоже, меняет идентификатор сообщения и отправку необработанного сообщения как видно здесь:

Начальные значения сообщения

Обновленные значения сообщения

Затем, когда запускается следующая строка, пытаясь удалить оригинал сообщение из входной очереди не может найти его, так как в соответствии с этой статьей https://docs.microsoft.com/en-us/rest/api/storageservices/delete-message2#remarks требуется исходный идентификатор сообщения и всплывающая квитанция, которые теперь изменились, что приводит к появлению следующей ошибки:

2020-04-20 14:17:58,603 WARN : Azure Storage Queue transport failed pushing a message through pipeline
Type: Microsoft.WindowsAzure.Storage.StorageException
Message: The remote server returned an error: (404) Not Found.
Source: Microsoft.WindowsAzure.Storage
StackTrace:
   at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs:line 50
   at Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass7.<CreateCallbackVoid>b__5(IAsyncResult ar) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:line 121
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessageRetrieved.<Unwrap>d__3.MoveNext() in C:\BuildAgent\work\3c19e2a032c05076\src\Transport\MessageRetrieved.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext() in C:\BuildAgent\work\3c19e2a032c05076\src\Transport\MessagePump.cs:line 153
TargetSite: T EndExecuteAsync[T](System.IAsyncResult)

Это проблема с логикой пакета NServiceBus c, или это что-то из моего пользовательского сообщения, развертывающего логи c вызывает изменение этих значений?

1 Ответ

0 голосов
/ 22 апреля 2020

Это ошибка. Когда развертывание не удается, сообщение еще не проходит через конвейер обработки. В результате этого нормальная восстанавливаемость не применяется. CloudQueueMessage необходимо «клонировать», а клон отправить в очередь ошибок, в то время как исходное сообщение использовалось для его удаления из очереди ввода. Я поднял ошибку в GitHub, и вы можете отслеживать процесс там.

...