Я работаю над проектом по переносу Python доказательства концепции (PO C) на PySpark. PO C активно использует Postgres и, в частности, геопространственную библиотеку PostGIS. Большая часть работы состоит из Python выдачи команд на Postgres перед обратным вызовом данных для окончательной обработки.
Некоторые запросы, передаваемые на Postgres, содержат CREATE TABLE
, INSERT
, CREATE TEMP TABLE
и CTE WITH
заявления. Я пытаюсь определить, можно ли передать эти запросы в Postgres из Spark через JDB C.
Может ли кто-нибудь подтвердить, доступна ли эта функция в Spark JDB C другим базам данных? Чтобы было ясно, я хочу передать простые запросы engli sh SQL на Postgres, а не использовать доступные API Spark SQL (так как они не поддерживают все необходимые мне операции). Я использую Spark версии 2.3.0 , PostgreSQL 10.11 и Python 2.7.5 (да, я знаю о EOL для Python 2 это другая история).
Вот что я пробовал до сих пор:
Используя SparkSession.read
создать сеанс Spark для Postgres
postgres = SparkSession.builder \
.appName("myApp") \
.config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
.getOrCreate()
определяет запрос для передачи dbtable
param
qry = """create table test (name varchar(50), age int)"""
pass qry
to dbtable
param для Postgres объекта сеанса искры
postgres.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://....) \
.option("dbtable", qry) \
.option("user", configs['user']) \
.option("password", configs['password']) \
.option("driver", "org.postgresql.Driver") \
.option("ssl", "true") \
.load()
, который возвращает следующая синтаксическая ошибка (такая же ошибка возникает при использовании других SQL команд, перечисленных выше):
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 9, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
Position: 15
Использование SparkSession.sql()
использование того же postgres
объекта, определенного выше
передать запрос в. sql ()
postgres.sql("""create table (name varchar(50), age int)""")
, который возвращает следующее исключение синтаксического анализа:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"
И если я заверну запрос в кавычки, как в postgres.sql("(create table (name varchar(50), age int))")
Затем я получаю другое исключение синтаксического анализа, которое заставляет меня поверить, что нужная мне функциональность невозможна:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"
Мои вопросы сводятся к:
- Мой подход отсутствует несколько вроде конфигурации или другого необходимого шага?
- Можно ли каким-либо образом использовать API
spark.sql()
с Postgres? - Возможно ли то, что я пытаюсь сделать, sh, даже возможно?
Я искал inte rnet в попытках найти примеры использования Spark SQL для выдачи такого рода запросов SQL на PostgreSQL, но не нашел никаких решений. Если есть решение, я был бы рад увидеть пример, в противном случае подтверждения того, что это невозможно, будет более чем достаточно.