Я использую библиотеку Microsoft.ServiceBus.Messaging.EventProcessorHost в. NET Framework 4.6.2 для приема сообщений из EventHub.
Потребитель, установленный на виртуальной машине с облачным хостом, будет надежно работать получите следующее исключение при первоначальном подключении (и переподключении) к концентратору событий. Это исключение перехватывается событием EventProcessorOptions.ExceptionReceived (см. Фрагмент кода ниже).
Не настроено retrypolicy, поэтому используется заданная по умолчанию retrypolicy, и, в конце концов, сообщения начинают поступать. Однако данные, которые были получены по событию концентратор, в то время как потребитель не был подключен, кажется, потерян, то есть эти сообщения не получены.
Я ищу идеи о том, как решить эту проблему, так как это кажется определенным c для этой машины другие машины не получают исключений и не теряют данные.
Я вижу порядковый номер между последним сообщением перед повторным подключением и следующим сообщением после всех исключений, как показано ниже, поэтому потеряно более 100 сообщений .
Обновление
Я посмотрел на Offset и SequenceNumber для EventHub в учетной записи хранения, и даже во время моего приложения остановлен, оба меняются. Означает ли это, что другой потребитель из того же концентратора событий и группы потребителей работает и проверяет?
2020-02-07 05:46:45.707 [7] INFO d__5::MoveNext(0) Received message with EnqueuedTimeUtc: 2/7/2020 5:46:45 AM, Offset: 657159864856, <strong>SequenceNumber: 898009</strong>
...
(see exception below for what happens in between)
...
2020-02-07 06:07:36.136 [10] INFO d__5::MoveNext(0) Received message with EnqueuedTimeUtc: 2/7/2020 6:07:01 AM, Offset: 657160878008, <strong>SequenceNumber: 898162</strong>
2020-02-07 06:00:01.051 [19] ERROR <>c::b__0_0(0) EventProcessHost MessagingCommunicationException received
2020-02-07 06:00:01.053 [19] ERROR Logger::LogException(0) Caught exception: <strong>An error occurred during communication with '0f0e81e45a9d4a24b79d5f16c532cad9_G26:13.93.226.138:10402'. Check the connection information, then retry.</strong>
2020-02-07 06:00:01.054 [19] ERROR Logger::LogException(0) Inner exception: <strong>A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond</strong>
2020-02-07 06:00:01.056 [19] DEBUG Logger::LogException(0) Stack trace following:
at Microsoft.ServiceBus.Common.ExceptionExtensions.ThrowException(Exception exception)
at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.ServiceBus.Messaging.EventHubConsumerGroup.<>c.b__65_1(IAsyncResult r)
at System.Threading.Tasks.TaskFactory<code>1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endFunction, действие 1 endAction, Task
1, логическое значение требует синхронизация) - Конец трассировки стека из предыдущего расположения, в котором было сгенерировано исключение --- в System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw () в System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (задача-задача) в Microsoft.ServiceBus.CelpersTas. EndAsyncResult (IAsyncResult asyncResult) в Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback (результат IAsyncResult)
Фрагмент кода:
public static async Task CreateEventHubProcessor(
StreamingClientConfiguration streamingServiceConfiguration,
string eventHubConnectionString,
string eventHubName)
{
try
{
try
{
ServiceBusEnvironment.SystemConnectivity.Mode =
(ConnectivityMode)streamingServiceConfiguration.ConnectivityMode;
}
catch (Exception)
{
ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect;
}</p>
<pre><code> var storageAccountConnectionString =
ConfigurationManager.ConnectionStrings["AzureStorageAccount"].ConnectionString;
var eventProcessorHostName = Guid.NewGuid().ToString();
var eventProcessorHost = new EventProcessorHost(
eventProcessorHostName,
eventHubName,
EventHubConsumerGroup.DefaultGroupName,
eventHubConnectionString,
storageAccountConnectionString);
Logger.LogInfo("Registering message processor...", LogOptions.LogToFileAndConsole);
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) =>
{
var exceptionType = e.Exception.GetType().Name;
Logger.LogError($"EventProcessHost {exceptionType} received", LogOptions.LogToFileAndConsole);
Logger.LogException(e.Exception, LogOptions.LogToFileAndConsole);
};
await eventProcessorHost.RegisterEventProcessorFactoryAsync(
new CustomerEventProcessorFactory(streamingServiceConfiguration),
options);
Logger.LogSuccess("Done.", LogOptions.LogToFileAndConsole);
return eventProcessorHost;
}
catch (Exception ex)
{
Logger.LogError("Exception occurred creating EventProcessorHost");
Logger.LogException(ex, LogOptions.LogToFileAndConsole);
throw;
}
}
1031 *