Использование Spark с Flask с JDBC - PullRequest
0 голосов
/ 16 января 2019

Что я делаю?

Я хочу создать службу API с использованием Flask для извлечения данных из одной базы данных, провести некоторый анализ данных и затем загрузить новые данные в отдельную БД.

Что не так?

Если я сам запускаю Spark, я могу получить доступ к БД, выполнить анализ и загрузить в БД. Но те же функции не будут работать при их использовании в приложении Flask (API-маршруты).

Как я это делаю?

Сначала я запускаю мастера и рабочего Spark. Я вижу, что у меня есть один рабочий в localhost:8080 под мастером.

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

../sbin/start-master.sh
../sbin/start-slave.sh spark://xxx.local:7077

Для применения в колбе:

app = Flask(__name__)

spark = SparkSession\
    .builder\
    .appName("Flark - Flask on Spark")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


@app.route("/")
def hello():
    dataframe = spark.read.format("jdbc").options(
        url="jdbc:postgresql://localhost/foodnome_dev?user=postgres&password=''",
        database="foodnome_test",
        dbtable='"Dishes"'
    ).load()

    print([row["description"]
           for row in dataframe.select('description').collect()])

    return "hello"

Для запуска этого приложения я использую драйвер JDBC с spark-submit:

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar server.py

Какую ошибку я получу?

На стороне Flask ошибка - Внутренняя ошибка сервера. На стороне искры,

File "/Users/leoqiu/Desktop/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.67, executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:55)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)

Ответы [ 2 ]

0 голосов
/ 16 января 2019

Вот что сработало для меня, как и предполагалось. Нужно --jars

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar --jars postgresql-42.2.5.jar server.py
0 голосов
/ 16 января 2019

--driver-class-path здесь недостаточно. Jar драйвера также должен быть добавлен в путь к классу executor. Обычно это обрабатывается вместе с помощью:

  • spark.jars.packages / --packages
  • spark.jars / --jars

хотя вы все еще можете использовать spark.executor.extraClassPath.

Объяснение

С JDBC драйвер источника отвечает за чтение метаданных (схемы) и исполнителей за фактический процесс извлечения данных.

Такое поведение является общим для различных внешних источников данных, поэтому всякий раз, когда вы используете не встроенный формат, вы должны распределять соответствующие jar-файлы по кластеру.

См. Также

Как использовать источник JDBC для записи и чтения данных в (Py) Spark?

...