Мне нужно, чтобы мой потребитель потреблял определенное количество TopicPartitionOffset(here from offset 278)
.Предположим, что Сообщения были созданы каким-либо производителем в определенной теме, например, ="Test_1"
ранее.Вот мой код
using System;
using Confluent.Kafka;
public class ConsumingTest
{
public static void Main(string[] args)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
};
using (var consumer = new Consumer<Null, string>(consumerConfig))
{
Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
consumer.Subscribe("Test_1");
var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));
consumer.Assign(topicPartitionOffset);
consumer.Seek(topicPartitionOffset);
while (true)
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine(e.Message);
}
}
}
}
в строке ----> var cr = consumer.Consume();
Потребитель потребляет, но ничего не происходит.В чем проблема.
Я уже сделал AutoOffsetReset = AutoOffsetResetType.Earliest
в ConsumerConfig, и Consumer Consumer потребляет все сообщения со всех смещений, но это не то, что я ищу.