EventProcessorHost возвращает MessagingCommunicationException и теряет данные - PullRequest
0 голосов
/ 07 февраля 2020

Я использую библиотеку Microsoft.ServiceBus.Messaging.EventProcessorHost в. NET Framework 4.6.2 для приема сообщений из EventHub.

Потребитель, установленный на виртуальной машине с облачным хостом, будет надежно работать получите следующее исключение при первоначальном подключении (и переподключении) к концентратору событий. Это исключение перехватывается событием EventProcessorOptions.ExceptionReceived (см. Фрагмент кода ниже).

Не настроено retrypolicy, поэтому используется заданная по умолчанию retrypolicy, и, в конце концов, сообщения начинают поступать. Однако данные, которые были получены по событию концентратор, в то время как потребитель не был подключен, кажется, потерян, то есть эти сообщения не получены.

Я ищу идеи о том, как решить эту проблему, так как это кажется определенным c для этой машины другие машины не получают исключений и не теряют данные.

Я вижу порядковый номер между последним сообщением перед повторным подключением и следующим сообщением после всех исключений, как показано ниже, поэтому потеряно более 100 сообщений .

Обновление

Я посмотрел на Offset и SequenceNumber для EventHub в учетной записи хранения, и даже во время моего приложения остановлен, оба меняются. Означает ли это, что другой потребитель из того же концентратора событий и группы потребителей работает и проверяет?

2020-02-07 05:46:45.707 [7] INFO d__5::MoveNext(0) Received message with EnqueuedTimeUtc: 2/7/2020 5:46:45 AM, Offset: 657159864856, <strong>SequenceNumber: 898009</strong> ... (see exception below for what happens in between) ... 2020-02-07 06:07:36.136 [10] INFO d__5::MoveNext(0) Received message with EnqueuedTimeUtc: 2/7/2020 6:07:01 AM, Offset: 657160878008, <strong>SequenceNumber: 898162</strong>

2020-02-07 06:00:01.051 [19] ERROR <>c::b__0_0(0) EventProcessHost MessagingCommunicationException received 2020-02-07 06:00:01.053 [19] ERROR Logger::LogException(0) Caught exception: <strong>An error occurred during communication with '0f0e81e45a9d4a24b79d5f16c532cad9_G26:13.93.226.138:10402'. Check the connection information, then retry.</strong> 2020-02-07 06:00:01.054 [19] ERROR Logger::LogException(0) Inner exception: <strong>A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond</strong> 2020-02-07 06:00:01.056 [19] DEBUG Logger::LogException(0) Stack trace following: at Microsoft.ServiceBus.Common.ExceptionExtensions.ThrowException(Exception exception) at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.ServiceBus.Messaging.EventHubConsumerGroup.<>c.b__65_1(IAsyncResult r) at System.Threading.Tasks.TaskFactory<code>1.FromAsyncCoreLogic(IAsyncResult iar, Func 2 endFunction, действие 1 endAction, Task 1, логическое значение требует синхронизация) - Конец трассировки стека из предыдущего расположения, в котором было сгенерировано исключение --- в System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw () в System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (задача-задача) в Microsoft.ServiceBus.CelpersTas. EndAsyncResult (IAsyncResult asyncResult) в Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback (результат IAsyncResult)

Фрагмент кода:

public static async Task CreateEventHubProcessor( StreamingClientConfiguration streamingServiceConfiguration, string eventHubConnectionString, string eventHubName) { try { try { ServiceBusEnvironment.SystemConnectivity.Mode = (ConnectivityMode)streamingServiceConfiguration.ConnectivityMode; } catch (Exception) { ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect; }</p> <pre><code> var storageAccountConnectionString = ConfigurationManager.ConnectionStrings["AzureStorageAccount"].ConnectionString; var eventProcessorHostName = Guid.NewGuid().ToString(); var eventProcessorHost = new EventProcessorHost( eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageAccountConnectionString); Logger.LogInfo("Registering message processor...", LogOptions.LogToFileAndConsole); var options = new EventProcessorOptions(); options.ExceptionReceived += (sender, e) => { var exceptionType = e.Exception.GetType().Name; Logger.LogError($"EventProcessHost {exceptionType} received", LogOptions.LogToFileAndConsole); Logger.LogException(e.Exception, LogOptions.LogToFileAndConsole); }; await eventProcessorHost.RegisterEventProcessorFactoryAsync( new CustomerEventProcessorFactory(streamingServiceConfiguration), options); Logger.LogSuccess("Done.", LogOptions.LogToFileAndConsole); return eventProcessorHost; } catch (Exception ex) { Logger.LogError("Exception occurred creating EventProcessorHost"); Logger.LogException(ex, LogOptions.LogToFileAndConsole); throw; } } 1031 *

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...