Повторно потреблять сообщения Кафки с заданного времени - PullRequest
2 голосов
/ 05 марта 2020

Я использую Confluent.Kafka. NET клиент версии 1.3.0. Я хотел бы начать потреблять сообщения с определенного времени.

Для этого я мог бы использовать OffsetsForTimes для получения желаемого смещения и Commit это смещение для этого раздела:

private void SetOffset()
{
    const string Topic = "myTopic";
    const string BootstrapServers = "server1, server2";

    var adminClient = new AdminClientBuilder(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    var consumer = new ConsumerBuilder<byte[], byte[]>(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "group.id", this.groupId },
            { "enable.auto.commit", "false" },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    // Timestamp to which the offset should be set to
    var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);

    var newOffsets = new List<TopicPartitionOffset>();
    var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
    foreach (var topicMetadata in metadata.Topics)
    {
        if (topicMetadata.Topic == Topic)
        {
            foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
            {
                var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);

                IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
                    new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
                    TimeSpan.FromSeconds(5));

                newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
            }
        }
    }

    consumer.Commit(newOffsets);

    // Consume messages
    consumer.Subscribe(Topic);
    var consumerResult = consumer.Consume();
    // process message
    //consumer.Commit(consumerResult);
}

Это прекрасно работает, если я хочу пропустить сообщения и перейти к заданному смещению, если смещение, к которому я хотел бы перейти, равно после последнего подтвержденного сообщения.

Однако вышеупомянутый подход не будет работать, если заданная временная метка будет до временной метки последнего принятого сообщения. В приведенном выше коде, если timeStamp предшествует отметке времени последнего подтвержденного сообщения, то OffsetsForTimes вернет смещение этого последнего подтвержденного сообщения + 1. Даже если я вручную установлю смещение на более низкое смещение, тогда consumer.Commit(newOffsets), кажется, не имеет никакого эффекта, и я получаю первое незафиксированное сообщение при потреблении.

Есть ли способ добиться этого из кода?

1 Ответ

0 голосов
/ 05 марта 2020

Я не эксперт, но я попытаюсь объяснить, как вы могли бы это сделать.

Прежде всего, мы должны упомянуть методы подписки и назначения.

Когда Вы используете подписку, вы передаете одну или несколько тем. При этом список разделов каждой темы присваивается потребителю в зависимости от количества потребителей в его группе. Раздел topi c - это объект, образованный именем topi c и номером раздела.

consumer.Subscribe(Topic);

Вы можете использовать assign, чтобы передать разделы, которые будет читать потребитель. Этот метод не использует функциональные возможности управления группами потребителей (где нет необходимости в group.id). Если я не ошибаюсь, в методе assign вы можете указать начальное смещение.

consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
consumer.Assign(topicPartition, new Offset(lastConsumedOffset));

Другой вариант - использовать Метод seek () для установки смещения

consumer.Seek(topicPartitionOffset);

Если вы собираетесь смешивать подписку и назначать, помните, что вы должны использовать отписку раньше.

Другой вариант, если вы хотите Чтобы повторно использовать все сообщения, создайте потребителя в новой другой группе потребителей.

ПРИМЕР (ОБЗОР)

Я оставляю вас и пример на данный момент, я буду проверьте это позже. Я сделал пример в java, потому что я более знаком с ним. В этом примере я не использую подписку, я использую assign. Сначала извлекаются разделы topi c, мы устанавливаем дату и время начала чтения сообщений, создаем карту, указывающую, что это время и дата для каждого раздела.

С созданной картой мы получаем смещение каждого раздела на указанная дата-время с методом offsetsForTimes. Со смещением каждого раздела мы используем search для перехода к этому смещению на каждом разделе и, наконец, мы потребляем сообщения.

Сейчас у меня нет времени, чтобы проверить код, но я сделаю это. Надеюсь, это поможет.

        AdminClient client = AdminClient.create(getAdminClientProperties());
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(
                getConsumerProperties());

        String TOPIC = "topic";

        // get info of all partitions of a topic
        List<PartitionInfo> partitionsInfo = consumer.partitionsFor(TOPIC);

        // create TopicPartition list
        Set<TopicPartition> partitions = new HashSet<>();
        for (PartitionInfo p : partitionsInfo) {
            partitions.add(new TopicPartition(p.topic(), p.partition()));
        }

        // Consumer will read from all partitions
        consumer.assign(partitions);
        DateTime timeToStartReadMessagesFrom = new DateTime(2020, 3, 1, 0, 0, 0);

        Map<TopicPartition, Long> timestamps = new HashMap<>();
        for (TopicPartition tp : partitions) {
            timestamps.put(tp, timeToStartReadMessagesFrom.getMillis());
        }
        // get the offset for that time in each partition
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, offsets.get(tp).offset());
        }

        while (true) {
            final ConsumerRecords<String, GenericRecord> consumerRecords = consumer.poll(1000);
            // do something
            break;
        }
        consumer.close();
        System.out.println("DONE");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...