В моем сценарии у меня будет множество событий, приходящих сразу, а затем длительные периоды времени, когда EventHub будет простаивать. В моем процессоре-клиенте я хочу проверять каждые N событий или N минут (в зависимости от того, что наступит раньше).
Вот как я настроил Azure .Messaging.EventHubs.EventProcessorClient:
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
// Start the processing
await processor.StartProcessingAsync();
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(10));
Console.WriteLine($"{eventsProcessed} events have been processed");
}
В моем ProcessEventHandler я проверяю событияProcessedSinceLastCheckpoint, а также время, прошедшее с секундомером. Когда любой из них достигает своего максимума, я сбрасываю оба и отмечаю это в своем окне консоли:
static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
++eventsProcessed;
++eventsProcessedSinceLastCheckpoint;
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
// After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
{
eventsProcessedSinceLastCheckpoint = 0;
_checkpointStopWatch.Restart();
await eventArgs.UpdateCheckpointAsync();
Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
}
return Task.CompletedTask;
}
Проверка переменной eventsProcessedSinceLastCheckpoint работает отлично, так как ProcessEventHandler запускается всякий раз, когда приходят новые события. Однако, когда EventHub бездействует ProcessEventHandler не вызывается, поэтому в случаях, когда EventHub работает тихо в течение многих минут или часов, я никогда не буду проверять контрольную точку по истечении времени.
Я понимаю, что могу просто удалить таймер и что мой процессор должен быть в состоянии обработать повторяющиеся события, если между контрольными точками происходит sh. Но в моем сценарии (так как у меня будут такие долгие простои), я хочу воспользоваться своим временем и наверстать упущенное, чтобы избежать появления дополнительных дубликатов, когда я смогу. Отсюда добавление таймера в качестве запасного во время периодов простоя.
Мой вопрос: Как мне позвонить UpdateCheckpointAsyn c () за пределами ProcessEventHandler ? Кажется, что метод существует только в ProcessEventArgs . Я не могу вызвать его напрямую на EventProcessorClient, что было бы идеально, так как я могу переместить проверку таймера за пределы ProcessEventHandler и в мое время l oop ....