Проблемы с сериализацией при подключении к кластеру Spark - PullRequest
1 голос
/ 01 июля 2019

У меня есть приложение Spark, написанное на Scala, которое пишет и читает файлы Parquet. Приложение предоставляет HTTP API и, когда оно получает запросы, отправляет работу в кластер Spark через долгоживущий контекст, который сохраняется в течение всей жизни приложения. Затем он возвращает результаты клиенту HTTP.

Это все работает нормально, когда я использую локальный режим, с local[*] в качестве мастера. Однако, как только я пытаюсь подключиться к кластеру Spark, у меня возникают проблемы с сериализацией. С сериализатором Spark по умолчанию я получаю следующее:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.FilterExec.otherPreds of type scala.collection.Seq in instance of org.apache.spark.sql.execution.FilterExec.

Если я включаю сериализатор Kryo, я получаю java.lang.IllegalStateException: unread block data.

Это происходит при попытке чтения из файлов Parquet, однако я не верю, что это имеет какое-либо отношение к самим файлам Parquet, просто к сериализации кода, отправляемого в кластер Spark.

Из многих поисков в Интернете я понял, что это может быть вызвано несовместимостью между версиями Spark или даже версиями Java. Но используемые версии идентичны.

Приложение написано на Scala 2.12.8 и поставляется с Spark 2.4.3. Кластер Spark работает под управлением Spark 2.4.3 (версия, скомпилированная в Scala 2.12). И компьютер, на котором работают кластер Spark и приложение, использует openJDK 1.8.0_212.

Согласно другому интернет-поиску, проблема могла быть из-за несоответствия в URL spark.master. Поэтому я установил spark.master в spark-defaults.conf на то же значение, которое я использую в приложении для подключения к нему.

Однако, это не решило проблему, и теперь у меня заканчиваются идеи.

1 Ответ

0 голосов
/ 02 июля 2019

Я не совсем уверен, каково основное объяснение, но я исправил его, скопировав банку моего приложения в каталог Spark jars.Тогда я все еще сталкивался с ошибкой, но другой: что-то о пропущенном классе Cats/kernel/Eq.Поэтому я добавил cats-kernel jar в каталог Spark jars.

И теперь все работает.Что-то, что я прочитал в другом потоке переполнения стека, может объяснить это:

Я думаю, что всякий раз, когда вы выполняете любую операцию с картой, используя лямбду, которая ссылается на методы / классы вашего проекта, вы должны предоставитьих в качестве дополнительной банки.Spark выполняет сериализацию самой лямбды, но не собирает ее зависимости.Не уверен, почему сообщение об ошибке не является информативным.

...