pyspark логическое соединение для vertica sql - PullRequest
0 голосов
/ 13 февраля 2019

spark1.6, извлечение данных из моей базы данных Vertica для работы с ним, приведенный ниже запрос хорошо работает на vertica db, но не работает на pyspark, Spark DataFrames поддерживают push-down для предикатов с JDBC-источниками, но предикат term являетсяиспользуется в строгом смысле SQL.Это означает, что он охватывает только предложение WHERE.Более того, похоже, что оно ограничено логическим соединением (нет IN и OR, я боюсь) и простыми предикатами, оно показывает эту ошибку: java.lang.RuntimeException: опция 'dbtable' не указана

conf = (SparkConf()
.setAppName("hivereader")
.setMaster("yarn-client")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.io.compression.codec", "snappy")
.set("spark.rdd.compress", "true")
.set("spark.executor.cores" , 7)
.set("spark.sql.inMemoryStorage.compressed", "true")
.set("spark.sql.shuffle.partitions" , 2000)
.set("spark.sql.tungsten.enabled" , 'true')
.set("spark.port.maxRetries" , 200))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

url = "*******"
properties = {"user": "*****", "password": "*******", "driver": "com.vertica.jdbc.Driver" }

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

1 Ответ

0 голосов
/ 14 февраля 2019

Проблема в том, что, хотя вы и говорите, что этот запрос работает в Vertica, ваш запрос не написан в синтаксисе SQL, который Vertica распознает.Ваш запрос должен быть переписан как:

SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION
FROM traffic.stats
WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'

Исправляя все эти ошибки, ваш метод sqlContext.read должен выглядеть следующим образом.

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

Или вы можете использовать псевдоним таблицы как подпрограмму-просите и используйте dbtable вместо query.

df = sqlContext.read.format("JDBC").options(
    url = url,
    dbtable = "(SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29') temp",
    **properties
).load()

df.show()
...