Возврат от потребителя Кафки, когда нет сообщения - PullRequest
0 голосов
/ 23 марта 2019

Я хочу обработать тему при запуске приложения, используя Confluent dotnet client .Предположим следующий пример:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

Когда в Kafka нет нового сообщения, c.Consume будет заблокирован.Поскольку я хочу использовать его для запуска приложения (например, для разогрева кэша), я хочу продолжить свой код, когда обнаружил, что нового сообщения нет.

Я знаю, что существует перегрузка для установки тайм-аута, например c.Consume(timeout), но проблема с этим подходом состоит в том, что если у вас есть сообщение в вашей теме, и время чтения сообщения было больше, чем ваш тайм-аут, выполучить нулевой вывод, который не желателен.

Ответы [ 2 ]

1 голос
/ 24 марта 2019

Вы можете использовать OnPartitionEOF событие, которое указывает, что вы достигли конца раздела.

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}
1 голос
/ 24 марта 2019

Потребитель (и) не должен знать о производителе (ах).

Теперь, если вы хотите знать, что вы прочитали все в теме с того момента, как вы начали потреблять, вы можете:

  1. Загрузить новейшее смещение перед началом потребления.
  2. Тогда начните потреблять сообщения.
  3. Если смещение сообщения совпадает с последним смещением, которое вы загрузили ранее, прекратите потребление.

Я не являюсь C# разработчиком, но из того, что я прочитал в документированном документе dotnet, вы можете позвонить QueryWatermarkOffsets на потребителя, чтобы получить самое старое и новейшее смещение. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_

А затем, в классе Message у вас есть Offset аксессор. Таким образом, все это не должно быть слишком сложным для достижения. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset

...