Получать сообщения Kafka 'writeahead' с потребителем JVM - PullRequest
0 голосов
/ 11 января 2019

У нас есть потоковое приложение, которое использует семантику "точно один раз", когда один тематический раздел остановился. Мы замечаем, что смещения увеличиваются с шагом в два, и понимаем, что нечетные сообщения являются частью двухфазной фиксации транзакций Kafka.

Мы написали Consumer<Byte[], Byte[]> (используя kafka-клиентов 2.1.0), чтобы выгрузить все эти сообщения на диск с isolation.level = "read_uncommitted", но эти сообщения с нечетными номерами не выбираются. Мы можем что-нибудь сделать, чтобы получить их?

1 Ответ

0 голосов
/ 11 января 2019

Контрольные записи недоступны для потребителей.

Чтобы «увидеть» их, вам нужно использовать инструмент DumpLogSegments:

./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log

Контрольные партии будут отображаться как обычные партии, но для них флаг isControl будет установлен в значение true.

baseOffset: 1618 lastOffset: 1618 count: 1 baseSequence: 1 lastSequence: 1 ManufacturerId: 1000 производительEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false позиция: 1778601 CreateTime: 1547217145114 isvalid: true размер: 1097 магия: 2 сжимать код: НЕТ crc: 1680083731

baseOffset: 1619 lastOffset: 1619 count: 1 baseSequence: -1 lastSequence: -1 идентификатор_производителя: 1000 время_производителя: 0 isTransactional: true isControl: true позиция: 1779698 CreateTime: 1547217145210 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 2028573478

Вы также можете использовать флаг --deep-iteration для отображения метаданных отдельной записи (или даже --print-data-log для отображения фактических данных записи). В этом случае вы можете увидеть, является ли контрольный пакет фиксацией или восстановлением:

смещение: 1618 позиция: 1778601 CreateTime: 1547217145114 isvalid: true размер ключа: 3 значения: 1024 магия: 2 сжатый код: НЕТ идентификатор_производителя: 1000 ManufacturerEpoch: 0 последовательность: 1 isTransactional: true headerKeys: []

смещение: 1619 позиция: 1779698 CreateTime: 1547217145210 isvalid: true размер ключа: 4 значения: 6 магия: 2 сжатый кодек: НЕТ идентификатор производителя: 1000 providerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: координатор COMMITEpoch: 0

...