Как работает Kafka S3 Connector после гарантии доставки - PullRequest
0 голосов
/ 16 апреля 2020

Я прочитал их блог и понял их примеры. https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/

Но я пытаюсь обернуть голову вокруг этого сценария, который у меня есть. Мои текущие конфигурации:

"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"

На основании того, что я прочитал о конфигурациях. Соединитель передаст файл из 50 записей или файл после 300000ms (5 минут), который когда-либо будет первым. Если соединитель загружает файл на s3, но не фиксирует его в Kafka, как Кафка повторно загрузит те же записи, которые будут перезаписывать файл s3, поскольку у меня установлен интервал расписания поворота? Не приведет ли это к дублированию в s3?

Ответы [ 2 ]

3 голосов
/ 16 апреля 2020

Документация к соединителю приемника S3 - это еще один хороший ресурс, в котором описывается, как соединитель может гарантировать точную однократную доставку на S3, и, что более важно, какая комбинация функций обеспечивает (или не обеспечивает) эту гарантию.

В частности, один из разделов этого документа гласит:

Чтобы гарантировать семантику с точностью до однократного с TimeBasedPartitioner, соединитель должен быть настроен на использование детерминированной реализации c TimestampExtractor и детерминированная c стратегия ротации. Определителями c меток времени являются записи Кафки (timestamp.extractor=Record) или поля записей (timestamp.extractor=RecordField). Детерминированная c конфигурация стратегии вращения имеет значение rotate.interval.ms (настройка rotate.schedule.interval.ms является недетерминированной c и приведет к аннулированию только одноразовых гарантий).

Ваша конфигурация соединителя приемника S3 действительно использует детерминированность c разделитель (через «partitioner.class»: «io.confluent.connect.storage.partitioner.TimeBasedPartitioner»), но он использует недетерминированный c экстрактор отметки времени Wallclock (через "timestamp.extractor": "Wallclock"). Это не определено c, потому что, если соединитель должен перезапускаться (например, из-за сбоя) и повторно обрабатывает определенную запись, он будет повторно обрабатывать эту запись позднее, и средство извлечения метки времени настенного тактового генератора выберет другое время для эта запись.

Во-вторых, ваш соединитель использует параметр rotate.schedule.interval.ms, который в документации отмечается как несовместимый с единовременной доставкой. Например, если соединителю нужно повторно обработать серию записей Kafka, он может разбить записи на объекты S3, отличные от первого раза, и это означает, что соединитель S3 заканчивает записывать разные объекты S3.

In Таким образом, коннектор приемника S3 с вашей конфигурацией не предоставит ровно один раз гарантии доставки.

0 голосов
/ 02 мая 2020

Используйте rotate.interval.ms вместе с timestamp.extractor, установленным на «Запись». Кроме того, убедитесь, что для topi c, с которого вы читаете, установлен тип отметки времени «LOG_APPEND_TIME»

Я не уверен, что рабочая конфигурация должна иметь свойство уровня изоляции потребителя, установленное для чтения зафиксированного. Зависит от того, будет ли это автоматически делать разъем S3.

Даже при всем этом все может go быть неправильным, если временная метка не монотонно увеличивается во время выборов лидера в кластере брокеров. Следите за статусом этой проблемы

...