Spark SQL JDB C (PySpark) до Postgres - Создание таблиц и использование CTE - PullRequest
0 голосов
/ 06 февраля 2020

Я работаю над проектом по переносу Python доказательства концепции (PO C) на PySpark. PO C активно использует Postgres и, в частности, геопространственную библиотеку PostGIS. Большая часть работы состоит из Python выдачи команд на Postgres перед обратным вызовом данных для окончательной обработки.

Некоторые запросы, передаваемые на Postgres, содержат CREATE TABLE, INSERT, CREATE TEMP TABLE и CTE WITH заявления. Я пытаюсь определить, можно ли передать эти запросы в Postgres из Spark через JDB C.

Может ли кто-нибудь подтвердить, доступна ли эта функция в Spark JDB C другим базам данных? Чтобы было ясно, я хочу передать простые запросы engli sh SQL на Postgres, а не использовать доступные API Spark SQL (так как они не поддерживают все необходимые мне операции). Я использую Spark версии 2.3.0 , PostgreSQL 10.11 и Python 2.7.5 (да, я знаю о EOL для Python 2 это другая история).

Вот что я пробовал до сих пор:

Используя SparkSession.read

создать сеанс Spark для Postgres

postgres = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
    .getOrCreate()

определяет запрос для передачи dbtable param

qry = """create table test (name varchar(50), age int)"""

pass qry to dbtable param для Postgres объекта сеанса искры

postgres.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://....) \
    .option("dbtable", qry) \
    .option("user", configs['user']) \
    .option("password", configs['password']) \
    .option("driver", "org.postgresql.Driver") \
    .option("ssl", "true") \
    .load()

, который возвращает следующая синтаксическая ошибка (такая же ошибка возникает при использовании других SQL команд, перечисленных выше):

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
  Position: 15

Использование SparkSession.sql()

использование того же postgres объекта, определенного выше

передать запрос в. sql ()

postgres.sql("""create table (name varchar(50), age int)""")

, который возвращает следующее исключение синтаксического анализа:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"

И если я заверну запрос в кавычки, как в postgres.sql("(create table (name varchar(50), age int))") Затем я получаю другое исключение синтаксического анализа, которое заставляет меня поверить, что нужная мне функциональность невозможна:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"

Мои вопросы сводятся к:

  1. Мой подход отсутствует несколько вроде конфигурации или другого необходимого шага?
  2. Можно ли каким-либо образом использовать API spark.sql() с Postgres?
  3. Возможно ли то, что я пытаюсь сделать, sh, даже возможно?

Я искал inte rnet в попытках найти примеры использования Spark SQL для выдачи такого рода запросов SQL на PostgreSQL, но не нашел никаких решений. Если есть решение, я был бы рад увидеть пример, в противном случае подтверждения того, что это невозможно, будет более чем достаточно.

1 Ответ

0 голосов
/ 06 февраля 2020

Возможно ли то, что я пытаюсь сделать sh, даже возможно?

Я бы сказал нет. Spark - это среда для обработки данных, поэтому его API в основном разработан для операций read и write с источниками данных. В вашем случае у вас есть несколько операторов DDL, и Spark не должен выполнять такие операции.

Например, опцией dbtable из вашего первого примера должно быть имя таблицы или какой-либо запрос SELECT.

Если вам нужно выполнить некоторые запросы DDL, DCL, TCL, то вы должны сделать это другим способом, например, через модуль psycopg2.

Может искра. sql ( ) API можно каким-либо образом использовать с Postgres?

spark.sql - это метод для выполнения кода Spark SQL над зарегистрированными в таблицах или представлениях SparkSession. Он работает с любыми поддерживаемыми источниками данных, не только с jdb c, но и на стороне Spark с синтаксисом Spark SQL. Например

val spark = SparkSession
        ...
        .getOrCreate()

spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://ip/database_name")
  .option("dbtable", "schema.tablename")
  .load()
  .createOrReplaceTempView("my_spark_table_over_postgresql_table")

// and then you can operate with a view:
val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")
...