Клиент Kafka .net не получает никаких сообщений при использовании Assign () - PullRequest
1 голос
/ 20 марта 2019

Я пытаюсь заставить мой сервис перечитывать тему kafka с начала до конца при запуске для инициализации внутренних структур данных. Я использую Confluent .NET клиент. Насколько я понимаю, следующий код должен подписать меня на тему смещения к началу:

consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));

Но по какой-то причине я не получаю ни сообщений, ранее существовавших в теме, ни новых. Мое понимание метода Assign () неверно? Есть ли способ достичь желаемого результата с помощью Subscribe () без необходимости полного сброса смещений с помощью kafka CLI?

Вот полный тестовый клиент, у меня всегда выводится «Нет сообщений ...», несмотря на то, что в теме есть сообщения и приходят новые сообщения.

    static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "test-consumer",
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };
        var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
        var topic = "test-topic";
        consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
        while (true)
        {
            var result = consumer.Consume(TimeSpan.FromSeconds(5));
            if (result == null)
                Console.WriteLine("No messages...");
            else
                Console.WriteLine($"Offset: {result.Offset}");
        }
    }

Ответы [ 2 ]

0 голосов
/ 22 апреля 2019

Проблема была в том, что я вызвал Assign () с Partition.Any, работает следующий код:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));

0 голосов
/ 20 марта 2019

Почему вы хотите использовать Assign?Следующее должно работать для вас:

public static void Main(string[] args)
{
    var conf = new ConsumerConfig
    { 
        GroupId = "test-consumer",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
    {
        c.Subscribe("test-topic");

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) => {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);
                    Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            c.Close();
        }
    }
}
...