Повторяющиеся сообщения, используемые клиентом Kafka в Spark Structured Streaming: - PullRequest
0 голосов
/ 28 февраля 2020

Проблема в том, что через некоторое время из кластера Kafka (5 узлов) неожиданно принимаются повторяющиеся сообщения. Наше приложение не должно обрабатывать повторяющиеся сообщения. Потребитель Кафки реализован с помощью Spark Structured Streaming.

Заметил, что в этот момент все повторяющиеся сообщения приходят из одного раздела. Я обнаружил, что в определенный момент времени смещение конкретного раздела сбрасывается до самого раннего (стабильное смещение), и сообщения снова используются из смещения сброса.

Ниже приведены журналы у потребителя в режиме отладки. Дубликаты вылетели из раздела-20, и на этом этапе ниже 2 строк немного отличаются от обычных журналов. После четырех часов работы встроенных дубликатов и в этот конкретный момент клиент неожиданно отправляет ListOffsetRequest с partitionTimestamps -2. Я проверил в полных журналах, только в этот момент значение было отправлено -2 (самое раннее) вместо -1 (самое последнее) для этого конкретного раздела. Это выглядит странно и не может найти root причину такого поведения.

<!-- language: lang-none -->
20/02/20 18:27:53 DEBUG internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-bca38026-e65e-4e24-8d86-aaa-driver-0]
 Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={Hid1328.gngp-raw-20=-2}, isolationLevel=READ_UNCOMMITTED) to broker famescpolyfi5.teliacompany.net:9095 (id: 5 rack: null)

20/02/20 18:27:53 DEBUG internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-bca38026-e65e-4e24-8d86-9daf0d34b406--1757490130-driver-0] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={Hid1328.gngp-raw-5=-1, Hid1328.gngp-raw-20=-1, Hid1328.gngp-raw-25=-1, Hid1328.gngp-raw-10=-1, Hid1328.gngp-raw-15=-1, Hid1328.gngp-raw-0=-1}, isolationLevel=READ_UNCOMMITTED) to broker famescpolyfi5.teliacompany.net:9095 (id: 5 rack: null) 

Я пытался найти проблему с другой точки зрения, а именно

  1. Topi c конфигурация (количество разделов и время хранения) в брокере.

  2. Контрольная точка записывается в HDFS.

  3. Конфигурация клиента Kafka

Поскольку приложение находится в режиме структурированной потоковой передачи, невозможно установить auto.offset.reset config как самое раннее.

Полное решение скомпрометировано из-за проблемы. Если кто-то сталкивался с этой проблемой, пожалуйста, поделитесь, как ее избежать и root причина этой проблемы.

...