Искровой поток с Кафкой, как обрабатывать исключения драйвера - PullRequest
0 голосов
/ 25 мая 2018

У меня есть потоковое приложение SPARK, которое читает непрерывные входящие данные из прямого потока kafka.

Это мои настройки

   spark-core_2.10 - 2.1.1
   spark-streaming_2.10 - 2.1.1
   spark-streaming-kafka-0-10_2.10 - 2.0.0
   kafka_2.10 - 0.10.1.1

В большинстве случаев он работает довольно хорошо, но иногда, когда перезапускается мое другое приложение, которое пишет в kafka, я получаю следующие ошибки.

 WARN NetworkClient: Error while fetching metadata with correlation id 139022 : {topic4908100105=LEADER_NOT_AVAILABLE}
 18/05/24 11:59:33 WARN NetworkClient: Error while fetching metadata with correlation id 139022 : {topic000001=LEADER_NOT_AVAILABLE}
.
.
.

 ERROR JobScheduler: Error generating jobs for time 1527163130000 ms
 java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative

Я видел другие сообщения в SO, и люди рекомендовали перезапустить приложение kafka или spark для восстановления после этой ошибки.Но мое приложение должно постоянно работать без какого-либо ручного вмешательства!

Есть ли способ обработать это исключение в моем потребительском классе Spark (java) и перезапустить потоковую передачу или приложение?

1 Ответ

0 голосов
/ 25 мая 2018

WARN NetworkClient: Ошибка при получении метаданных с идентификатором корреляции 139022: {topic4908100105 = LEADER_NOT_AVAILABLE} 18/05/24 11:59:33 WARN NetworkClient: Ошибка при получении метаданных с идентификатором корреляции 139022: {topic000001 = LEADIL_N }_AV

Это проблема Кафки, для каждого лидера раздела и его последователей.Запрос обслуживается лидером.Если лидер недоступен из-за проблемы с зоопарком, Кафка выдаст эту ошибку.

Вам нужно исправить проблему Кафки, чтобы получить данные.Однако вы можете обрабатывать сценарии исключений, используя следующие свойства конфигурации:

  • - conf spark.yarn.maxAppAttempts = 10
  • - conf spark.yarn.am.attemptFailuresValidityInterval = 1h
  • - conf spark.yarn.max.executor.failures = 10
  • - conf spark.yarn.executor.failuresValidityInterval = 1 ч
...