Обратите внимание, что я уже много читал следующую статью и пытался найти нужную информацию на разных форумах, но безуспешно:
https://medium.com/@anishekagarwal/kafka-log-cleaner-issues-80a05e253b8a
Надеюсь, вы поймете мою проблему и дадите несколько подсказок =)
Вот история:
Несколько дней назад мы развернули сервис дедупликации в кластере kafka.
Поскольку мы использовали этот сервис, мы заметили, что темы __consumer_offsets начали расти.
Причина была в том, что очиститель журналов (который использовался для уплотнения этой темы среди других) завершился с ошибкой:
java.lang.IllegalStateException: этот журнал содержит сообщение, превышающее максимально допустимый размер 1000012
Из того, что мы поняли, мы сначала думаем, что это проблема размера сообщения, поэтому мы увеличили значение max.messsage.bytes (до более чем 20 МБ), но затем мы получили ту же проблему (с сообщением об ошибке, корректно обновленным с помощью новое значение).
Итак, мы начали думать, что это может быть какое-то «поврежденное» значение размера сообщения или «неправильно понятый сегмент» (например, версия программы очистки журнала kafka неправильно обрабатывает сообщение)
Мы смогли выделить смещение сегмента, которое вызывает у нас проблему. Это было так странно, потому что когда мы использовали его с простым потребителем, запись составляла около 4 Кбайт, но потребителю потребовалось 7 или 8 минут, чтобы потребитель потреблял только эту запись (во время этого опроса tcpdump ясно показывал много> 1000 байт поступающих пакетов). от брокера).
Итак, мы начали использовать класс dumpSegment, чтобы посмотреть на сегмент, и он выглядел так (я заменил некоторые значения, чтобы немного анонимизировать):
Dumping 00000000004293321003.log
Starting offset: 4293321003
baseOffset: 4310760245 lastOffset: 4310760245 count: 1 baseSequence: -1 lastSequence: -1 producerId: 66007 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 0 CreateTime: 1556544968606 size: 78 magic: 2 compresscodec: NONE crc: 2072858171 isvalid: true
| offset: 4310760245 CreateTime: 1556544968606 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
baseOffset: 4310760295 lastOffset: 4310760295 count: 1 baseSequence: -1 lastSequence: -1 producerId: 65010 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 78 CreateTime: 1556544968767 size: 78 magic: 2 compresscodec: NONE crc: 2830498104 isvalid: true
| offset: 4310760295 CreateTime: 1556544968767 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
baseOffset: 4310760731 lastOffset: 4310760731 count: 1 baseSequence: -1 lastSequence: -1 producerId: 64005 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 156 CreateTime: 1556544969525 size: 78 magic: 2 compresscodec: NONE crc: 3044687360 isvalid: true
| offset: 4310760731 CreateTime: 1556544969525 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
baseOffset: 4310760732 lastOffset: 4310760732 count: 1 baseSequence: -1 lastSequence: -1 producerId: 66009 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 234 CreateTime: 1556544969539 size: 78 magic: 2 compresscodec: NONE crc: 1011583163 isvalid: true
| offset: 4310760732 CreateTime: 1556544969539 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
Итак, мы видели много гроздей, как указано выше
А затем ошибочное смещение, приводящее к сбою очистителя журнала:
baseOffset: 4740626096 lastOffset: 4740626096 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 50 isTransactional: false isControl: false position: 50471272 CreateTime: 1557322162912 size: 4447 magic: 2 compresscodec: NONE crc: 3030806995 isvalid: true
| offset: 4740626096 CreateTime: 1557322162912 keysize: 25 valuesize: 4352 sequence: -1 headerKeys: [] key: {"metadata":"MYGROUPID"} payload: {"protocolType":"consumer","protocol":"range","generationId":32,"assignment":"{CLIENTID=[TOPICA-0, TOPICA-1, TOPICA-2, TOPICA-3, TOPICA-4, TOPICA-5, TOPICA-6, TOPICA-7, TOPICA-8, TOPICA-9, TOPICA-10, TOPICA-11], AND THIS FOR ABOUT 10 OTHER TOPICS}"} ==> approximative 4K bytes
Это не похоже на стандартную схему данных __consumer_offsets ([groupId, topicName, partitionNumber] :: offset схема), и я думаю, это потому, что новый сервис использовал транзакции kafka.
Мы думаем, что это может произойти из-за того, что наш кластер kafka равен 0.9.11 (или, может быть, 1.0.1), а наша служба дедупликации использует API-интерфейс kafka 2.0.0 (и использует транзакции).
Итак, у меня есть несколько вопросов:
Как __consumer_offsets обрабатывает зафиксированные смещения при работе с транзакциями kafka? Я вообще не понимаю структуру .. Похоже, что существует несколько сообщений маркера COMMIT (но без понятия, какая это тема или раздел .. Так, как это работает: /?) Всегда следует за этой не транзакционной записью, которая включает тег метаданных .. Есть ли документация по этой структуре?
Возможно ли, что версия очистителя журнала 1.1.0 кластера kafka некорректно обрабатывает подобные сообщения транзакций в __consumer_offsets (передается через API 2.0.0)?
Любая подсказка / исправление приветствуются здесь.
Regads
Янник