офсет и кафка неизвестны для некоторых разделов темы - PullRequest
0 голосов
/ 02 октября 2018

enter image description here

Я использую потребитель с https://github.com/confluentinc/confluent-kafka-go. Версия kafka - 0.10.1.0.

Вот конфигурация моего потребителя:

kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":               broker,
    "group.id":                       "udwg20",
    "session.timeout.ms":              60000,
    "go.events.channel.enable":        true,
    "go.application.rebalance.enable": true,
    "default.topic.config": kafka.ConfigMap{
        "auto.offset.reset":      "earliest",
        "enable.auto.commit":      true,
        "auto.commit.interval.ms": 10000}})

В начале были показаны все текущие смещения и задержки, но после нескольких часов работы смещение и задержка некоторых разделов (которые не получили ни одного нового сообщения) становятся неизвестными.Если в раздел поступило сообщение, его смещение и задержка которого неизвестны, смещение и задержка снова будут видны, и сообщение будет использовано.

Когда есть некоторые разделы с неизвестным текущим смещением илаг, я перезапускаю потребителя, в это время все текущие разделы с неизвестным текущим смещением и статусом задержки будут снова использоваться с начала, но другие разделы, кажется, работают нормально.

Я также использовал потребителя Python, которыйиспользует сообщение из этой темы с другим идентификатором группы потребителей.потребитель Python, кажется, работает хорошо без какого-либо раздела с неизвестным текущим смещением и задержкой.

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Я использую команду ниже, чтобы увидеть, периодически ли фиксируется смещение моего идентификатора группы потребителей.

echo exclude.internal.topics=false > consumer.properties

kafka-console-consumer --consumer.config consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Несмотря на то, что я установил значение enable.auto.commit на true, оно непериодически совершать для разделов, которые его lag = 0.Текущее смещение для этих разделов удаляется через 2–3 часа, даже если группа потребителей все еще активна.

Чтобы решить эту проблему, я установил enable.auto.commit на false и написал свою собственную функцию для фиксации смещения послекаждые 5 секунд.

Вот идеал: когда потребитель получает новое событие Message или событие достижения конца раздела (PartitionEOF), из данных события я сохраняю последнее текущее смещениев карте фиксации (ключ: topic_partition значение: kafka.TopicPartition{ Topic, Partition, Offset }), и есть функция для периодической фиксации этой карты (может быть через каждые 5 секунд).Когда потребитель получает событие RevokedPartitions, я удаляю соответствующий ключ topic_partition из карты фиксации.

0 голосов
/ 02 октября 2018

offsets.retention.minutes используется для очистки неактивных групп потребителей.Если группа потребителей не фиксирует какое-либо смещение для offsets.retention.minutes (по умолчанию 24 часа), kafka очистит свое смещение.Вот почему смещение и журнал установлены на unknown.

Однако вы можете увеличить срок хранения смещения, но помните, что старые потребители будут резервировать место в теме __consumer_offsets.

...