Как потреблять из определенного TopicPartitionOffset с Confluent.Kafka в .Net - PullRequest
0 голосов
/ 29 декабря 2018

Мне нужно, чтобы мой потребитель потреблял определенное количество TopicPartitionOffset(here from offset 278).Предположим, что Сообщения были созданы каким-либо производителем в определенной теме, например, ="Test_1" ранее.Вот мой код

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

в строке ----> var cr = consumer.Consume(); Потребитель потребляет, но ничего не происходит.В чем проблема.

Я уже сделал AutoOffsetReset = AutoOffsetResetType.Earliest в ConsumerConfig, и Consumer Consumer потребляет все сообщения со всех смещений, но это не то, что я ищу.

1 Ответ

0 голосов
/ 31 декабря 2018

Решено: Я нашел решение, которое описано ниже:

  • добавил это

consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) Перед попыткой потреблять, и

  • Удалены эти

consumer.Subscribe("Test_1") и consumer.Seek(...)

Итак, обновленный код - это что-то вроде этого, которое прекрасно работает:

using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error}");
                    }
                }

                consumer.Close();
            }
...