Отметка времени сообщения и commit_timestamp в __consumer_offsets Кафки - PullRequest
1 голос
/ 04 июня 2019

В настоящее время я работаю над приложением, которое частично использует Apache Kafka (версия 2.2.0).Одна вещь, которую я должен сделать, это отслеживать, что (и что более важно, когда) другие потребители фиксируют свои текущие смещения.Насколько я могу судить, просто используя Java-клиент, невозможно получить соответствующие временные метки для зафиксированных смещений, потому что метод AdminClient listConsumerGroupOffsets в конечном итоге приводитобъекту OffsetAndMetadata, который не включает метку времени.Поэтому вместо этого я просто начал читать сообщения из темы __consumer_offsets.Если есть лучший способ сделать это, пожалуйста, дайте мне знать.

Теперь, если кто-то читает сообщения в __consumer_offsets напрямую, то у одного внезапно появляются две метки времени.Одна - это временная метка, прикрепленная к фактическому сообщению о фиксации, а другая - commit_timestamp, которая является частью содержимого сообщения.Моей первой мыслью было, что один из них, вероятно, установлен брокером, а другой - клиентом, который его зафиксировал (также, если вы посмотрите на /config/topics/__consumer_offsets в ZooKeeper, он не указывает сообщение LogAppendTimeотметка времени, поэтому можно предположить, что он просто использует значение по умолчанию).Увы, быстрый эксперимент с системным временем, сдвинутым вручную, показывает, что оба фактора устанавливаются брокером.Более того, они не всегда соглашаются (временная метка сообщения иногда немного опережает commit_timestamp).Я пытался погрузиться в код Кафки, чтобы точно понять, что происходит, но он довольно запутанный, и я недостаточно знаком с ним, чтобы быстро понять его.Итак, вот мои вопросы:

  1. Почему метка времени сообщения в __consumer_offsets автоматически LogAppendTime, хотя это явно не указано?Это просто, что производитель, который используется для отправки сообщения фиксации, оставляет отметку времени пустой?
  2. Почему отметка времени сообщения и commit_timestamp, включенные в сообщение, не согласуются?Кажется, я вспомнил, что где-то читал, что раньше можно было явно установить commit_timestamp и, таким образом, вручную контролировать сохранение зафиксированного смещения.
  3. Более важно: есть ли причина использовать одно поверх другого??Например, если все еще возможно установить commit_timestamp вручную, было бы гораздо разумнее использовать метку времени, прикрепленную к сообщению.

Я понимаю, что это очень специфический вопрос и, вероятно,не имеет большого значения для большинства.Но до сих пор я всегда мог понять, что происходит в фоновом режиме, используя Google и просматривая исходный код Кафки;тем не менее, этот меня немного озадачилТак что любые идеи очень ценятся.

1 Ответ

0 голосов
/ 05 июня 2019

Я думаю, что последняя отметка времени - время истечения.Можете ли вы попробовать следующее, чтобы убедиться?

Установить внутренние темы доступными, установив параметр "exclude.internal.topics=false" в consumer.config.

bin/kafka-console-consumer.sh  --consumer.config /tmp/consumer.config \
     --bootstrap-server localhost:9092 \
     --topic __consumer_offsets

Я мог видеть результат следующим образом:

[mygroup1,mytopic1,11]::[OffsetMetadata[55166421,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup1,mytopic1,13]::[OffsetMetadata[55037927,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup2,mytopic2,0]::[OffsetMetadata[126,NO_METADATA],CommitTime 1502060076343,ExpirationTime 1502146476343]

У меня нет конкретной версии, как вы упоминали в своем вопросе, установленной на моей машине, поэтому, пожалуйста, проверьте ее на своем конце.

...