Я пытаюсь заставить мой сервис перечитывать тему kafka с начала до конца при запуске для инициализации внутренних структур данных. Я использую Confluent .NET клиент.
Насколько я понимаю, следующий код должен подписать меня на тему смещения к началу:
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
Но по какой-то причине я не получаю ни сообщений, ранее существовавших в теме, ни новых.
Мое понимание метода Assign () неверно? Есть ли способ достичь желаемого результата с помощью Subscribe () без необходимости полного сброса смещений с помощью kafka CLI?
Вот полный тестовый клиент, у меня всегда выводится «Нет сообщений ...», несмотря на то, что в теме есть сообщения и приходят новые сообщения.
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
var topic = "test-topic";
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result == null)
Console.WriteLine("No messages...");
else
Console.WriteLine($"Offset: {result.Offset}");
}
}