Обновите контрольную точку смещения раздела EventHub на Azure .Messaging.EventHubs.EventProcessorClient при простое - PullRequest
0 голосов
/ 23 апреля 2020

В моем сценарии у меня будет множество событий, приходящих сразу, а затем длительные периоды времени, когда EventHub будет простаивать. В моем процессоре-клиенте я хочу проверять каждые N событий или N минут (в зависимости от того, что наступит раньше).

Вот как я настроил Azure .Messaging.EventHubs.EventProcessorClient:

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

В моем ProcessEventHandler я проверяю событияProcessedSinceLastCheckpoint, а также время, прошедшее с секундомером. Когда любой из них достигает своего максимума, я сбрасываю оба и отмечаю это в своем окне консоли:

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

Проверка переменной eventsProcessedSinceLastCheckpoint работает отлично, так как ProcessEventHandler запускается всякий раз, когда приходят новые события. Однако, когда EventHub бездействует ProcessEventHandler не вызывается, поэтому в случаях, когда EventHub работает тихо в течение многих минут или часов, я никогда не буду проверять контрольную точку по истечении времени.

Я понимаю, что могу просто удалить таймер и что мой процессор должен быть в состоянии обработать повторяющиеся события, если между контрольными точками происходит sh. Но в моем сценарии (так как у меня будут такие долгие простои), я хочу воспользоваться своим временем и наверстать упущенное, чтобы избежать появления дополнительных дубликатов, когда я смогу. Отсюда добавление таймера в качестве запасного во время периодов простоя.

Мой вопрос: Как мне позвонить UpdateCheckpointAsyn c () за пределами ProcessEventHandler ? Кажется, что метод существует только в ProcessEventArgs . Я не могу вызвать его напрямую на EventProcessorClient, что было бы идеально, так как я могу переместить проверку таймера за пределы ProcessEventHandler и в мое время l oop ....

1 Ответ

0 голосов
/ 23 апреля 2020

Установка свойства EventHubProcessorClientOptions.MaximumWaitTime при создании экземпляра вашего процессора позволит вызывать ваш обработчик, когда никакие события не читаются. При значении, отличном от нуля, время ожидания в основном означает «сообщать мне события, как только вы их получите, но проверять мой обработчик, если в течение этого интервала не было прочитано никаких событий».

В отношении обновления контрольных точек в В этом сценарии рекомендуемый подход заключается в том, чтобы кэшировать аргументы для последнего события, отправленного обработчику, и использовать его для вызова UpdateCheckpointAsync. В этом примере демонстрируется подход, обеспечивающий создание контрольной точки при остановке обработки раздела.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...