Vertica sql в pyspark не работает - PullRequest
0 голосов
/ 11 июля 2019

Я новичок в использовании pyspark под управлением sql to vertica.У меня импортирован модуль vertica_python, у меня есть таблица sql to vertica, мне нужно преобразовать результат запроса в фрейм данных pyspark.Я использую команду spark-submit для запуска, но получаю сообщение об ошибке «MemoryError».Может кто-нибудь помочь?

Я попытался увеличить и уменьшить память привода и исполнителя в команде spark-submit, не сработал, получил то же сообщение об ошибке.

from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
sc=SparkContext(appName="build_history_pyspark")
sqlContext=SQLContext(sc)

start_load_time_UTC='2019-06-01 00:00:00'
start_load_time_UTC=datetime.strptime(start_load_time_UTC,'%Y-%m-%d %H:%M:%S')


url="jdbc:vertica://144.60.100.141:5433/SRVVERTICA" . #This is the vertica database host IP and port
properties={"user":"hhhhh","password":"yyyyy","driver": "com.vertica.jdbc.Driver"}

sql_for_kafka="SELECT STARTTIME, LONGITUDE,LATITUDE FROM my_table WHERE (VERTICA_LOAD_TIME >'%s')" %(start_load_time_UTC)

sdf_orig=sqlContext.read.format("JDBC").options(url=url, query=sql_for_kafka, **properties).load()
sdf_orig.show(10)

Ошибка: MemoryError

моя команда:

spark-submit --driver-memory 10g --num-executors 10 --executor-cores 4 --executor-memory 10g build_history_pyspark.py

Я сделал дальнейшую отладку, я думаю, что ошибка связана с переменной start_load_time_UTC.Мне действительно нужно использовать переменную здесь, а не прямое значение.Но не знаю правильный синтаксис.

Я также изменил оператор запроса на: sql_for_kafka = "ВЫБЕРИТЕ STARTTIME, LONGITUDE, LATITUDE FROM my_table WHERE VERTICA_LOAD_TIME> $ start_load_time_UTC"

Я получил другую ошибку:

Py4JJavaError: An error occurred while calling o57.load.
: java.sql.SQLDataException: [Vertica][VJDBC](3679) ERROR: Invalid 
input syntax for timestamp: "$start_load_time_UTC"
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.io.ProtocolStream.readExpectedMessage(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepareImpl(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.<init>(Unknown Source)
at com.vertica.jdbc.jdbc4.S4PreparedStatement.<init>(Unknown Source)
at com.vertica.jdbc.VerticaJdbc4PreparedStatementImpl.<init>(Unknown Source)
at com.vertica.jdbc.VJDBCObjectFactory.createPreparedStatement(Unknown Source)
at com.vertica.jdbc.common.SConnection.prepareStatement(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

Причина: com.vertica.support.exceptions.DataException: [Vertica] VJDBC ОШИБКА: неверный синтаксис ввода для отметки времени: «$ start_load_time_UTC» ... еще 28

Я наконец-то решил проблему, изменив оператор запроса ниже:

sql_for_kafka="(SELECT STARTTIME, LONGITUDE,LATITUDE FROM my_table WHERE (VERTICA_LOAD_TIME >'{0}') )temp" .format(start_load_time_UTC)
...