Переменная Spark Broadcast, вызывающая сбой при перезагрузке состояния из каталога контрольных точек после перезагрузки - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть приложение для потокового воспроизведения (использующее spark 1.6.1), которое считывает данные из Kafka.Я использую каталог контрольных точек искры на hdf для восстановления после сбоев.

В коде используется переменная Broadcast с картой в начале каждого пакета следующим образом

public static void execute(JavaPairDStream<String, MyEvent> events) {
  final Broadcast<Map<String, String>> appConfig =  MyStreamConsumer.ApplicationConfig.getInstance(new    JavaSparkContext(events.context().sparkContext()));

Когда я отправляюработа во время выполнения все работает нормально и все события из кафки обрабатываются корректно.Проблема возникает при восстановлении после сбоев (проверено перезагрузкой компьютера, на котором запускается спарк) - приложение потоковой передачи искры действительно запускается правильно, и все выглядит хорошо в пользовательском интерфейсе при выполнении задания, но как только данные отправляются с помощью нижеуказанного исключения,встречается при вызове этого метода (& сбой задания):

appConfig.value() (the broadcast variable from the start!)

Сбой из-за ошибки задания Spark

  Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to java.util.Map

Если я уничтожу драйвер в интерфейсе искры и повторно отправлю заданиеиз командной строки все снова работает нормально.Но для нашего продукта требуется, чтобы он мог автоматически восстанавливаться после сбоев или даже просто перезагружаться с любого узла кластера, поэтому я должен исправить это.Эта проблема, безусловно, связана с использованием переменной Broadcast и загрузкой состояния из каталога контрольных точек spark после перезагрузки

Также обратите внимание, что я правильно создаю экземпляр широковещания (lazily / singleton):

public static Broadcast<Map<String, String>> getInstance(JavaSparkContext sparkContext) {
    if (instance == null) {

Я понимаю, что проблема, похоже, связана с: Возможно ли восстановить значение широковещания с контрольной точки потоковой передачи Spark

, но мне не удалось устранить проблемуследуя инструкциям там

1 Ответ

0 голосов
/ 07 декабря 2018

Отвечая на мой собственный вопрос, поскольку другие могут посчитать его полезным: странно, но следующее, похоже, устранило проблему, если я переместил следующий код в начало метода execute: appConfig.value () и назначил еготам нормальная переменная карты.

Тогда, если я просто использую переменную map в моем анонимном коде FlatMapFunction вместо appConfig.value () - все прекрасно работает даже после перезагрузки.

Опять же,не уверен, почему это работает, но работает ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...