PySpark 2.4: программно добавлены Maven JAR координаты перестали работать - PullRequest
0 голосов
/ 17 января 2019

Ниже приведен мой фрагмент запуска PySpark, который довольно надежен (я давно его использую). Сегодня я добавил две координаты Maven, показанные в опции spark.jars.packages (фактически «подключая» поддержку Kafka). Теперь это обычно запускает загрузку зависимостей (выполняется Spark автоматически):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------

num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVE_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'

# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
                   ('spark.app.name', 'myApp'),
                   ('spark.submit.deployMode', 'client'),
                   ('spark.ui.showConsoleProgress', 'true'),
                   ('spark.eventLog.enabled', 'false'),
                   ('spark.logConf', 'false'),
                   ('spark.jars.repositories', 'file:/' + JARS_IVE_REPO),
                   ('spark.jars.ivy', JARS_IVE_REPO),
                   ('spark.jars.packages', spark_jars_packages), ])

spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

Однако плагины не загружаются и / или не загружаются, когда я запускаю фрагмент (например, ./python -i init_spark.py), как они должны.

Этот механизм работал, но потом остановился. Чего мне не хватает?

Заранее спасибо!

1 Ответ

0 голосов
/ 18 января 2019

Это тот тип поста, где QUESTION будет стоить больше, чем ОТВЕТ, потому что приведенный выше код работает, но его нет нигде в документации или примерах Spark 2.x.

Выше показано, как я программно добавил функциональность в Spark 2.x посредством координат Maven. У меня было это работает, но потом оно перестало работать. Почему?

Когда я запустил вышеуказанный код в jupyter notebook, ноутбук - за кулисами - уже запустил этот идентичный фрагмент кода с помощью моего PYTHONSTARTUP сценария. Этот скрипт PYTHONSTARTUP имеет тот же код, что и выше, , но пропускает maven координаты (по назначению).

Вот как возникает эта тонкая проблема:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

Поскольку Spark Session уже существовал, вышеприведенный оператор просто повторно использовал этот существующий сеанс (.getOrCreate ()), в который не были загружены jar / библиотеки (опять же, потому что мой скрипт PYTHONSTARTUP преднамеренно их пропускает). Вот почему рекомендуется помещать операторы print в сценарии PYTHONSTARTUP (которые в противном случае не используются).

В конце концов, я просто забыл сделать это: $ unset PYTHONSTARTUP перед запуском демона JupyterLab / Notebook.

Надеюсь, Вопрос поможет другим, потому что именно так программно добавляются функциональные возможности в Spark 2.x (в данном случае Kafka). Обратите внимание, что вам потребуется подключение к Интернету для единовременной загрузки указанных файлов jar и рекурсивных зависимостей из Maven Central.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...