Kafka .NET - получить все существующие записи по этой конкретной теме, а затем продолжить прослушивание новых - PullRequest
0 голосов
/ 25 ноября 2018

В базовой конфигурации Kafka .NET, например, такой:

conf = new ConsumerConfig
{
            GroupId = $"{_topic}{CacheMessageConsumerSuffix}",
            BootstrapServers = Servers,
            AutoOffsetReset = AutoOffsetResetType.Earliest,
};


using (var c = new Consumer<string, string>(conf))
{
    c.Subscribe(_topic);

    bool consuming = true;
    c.OnError += (_, e) => consuming = !e.IsFatal;

    while (consuming)
    {
        var cr = c.Consume();
    }
}

Этот потребитель при запуске будет получать только новые сообщения от производителя.Как сначала получить все существующие записи по этой конкретной теме, а затем продолжить прослушивание новых?

Спасибо

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Я думаю, что я разработал, как это сделать.Как предлагается в этом сообщении в блоге, http://blog.empeccableweb.com/wp/2016/11/30/manual-offsets-in-kafka-consumers-example/, вы можете настроить обработчик событий, который будет запускаться при назначении разделов данному потребителю, а затем принудительно возвращать эти разделы к смещению 0. Что-то вроде этого:

using (var consumer = new Consumer<string,string>(configuration))
{
_consumer.OnPartitionsAssigned += OnPartitionsAssigned;
_consumer.Subscribe("my-topic");
...
}

private void OnPartitionsAssigned(object sender, List<TopicPartition> assignedPartitions)
{
    // Build a new list of Topic/Partition/Offset values with offset = Beginning
    var newPartitionOffsets = new List<TopicPartitionOffset>();
    assignedPartitions.ForEach(x => newPartitionOffsets.Add(new TopicPartitionOffset(x, Offset.Beginning)));

    // Assign to consumer
    consumer.Assign(newPartitionOffsets);
}

Не уверен, что есть более умный способ, но, похоже, это помогает.Конечно, это работает только в том случае, если это событие запускается сразу после запуска подписки, а не в течение срока ее действия.Но даже этим, вероятно, можно управлять.

0 голосов
/ 25 ноября 2018

В вашей конфигурации добавьте это после изменения значения вашей текущей группы потребителей (если оно у вас есть)

AutoOffsetReset = AutoOffsetResetType.Earliest

См. README

...