Ошибки JDBC при подключении Zeppelin Pyspark к Redshift - PullRequest
0 голосов
/ 22 февраля 2019

Я использую: EMR 5.20 Zeppelin 0.8.0 Spark 2.4.0

Мне удалось добавить интерпретатор Redshift, но я не могу вытащить данные в фрейм данных pyspark.Все, что я хочу сделать, - это скопировать таблицу красного смещения в информационный фрейм spark sql.

Я использовал wget для помещения драйвера красного смещения в библиотеку zeppelin.

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Затем я могу запросить базу данных с помощью следующей строки кода с добавленным мной интерпретатором.

%Redshift
select * from public.debtors

Тем не менее, я не могу использовать драйвер в спарк для извлечения данных.Если есть более простой способ, дайте мне знать.Я запустил это с самого начала, так как сначала требуется z.load.

%dep 
z.load("/usr/lib/zeppelin/lib/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar")


%pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql import DataFrameReader
sql_context = SQLContext(sc)

import os 
print(os.chdir('/usr/lib/zeppelin/lib/'))
print(os.getcwd())

def redshift_to_spark(sql_context, user, host, port, database, redshift_password, sql_query, spark_table, partition_count=100):
    url = 'jdbc:postgres://{host}:{port}/{database}'.format(
        host=host,
        port=port,
        database=database
    )
    properties = {'user': user, 'password': redshift_password, 'driver': 'org.postgresql.Driver'} 
    data_frame = DataFrameReader(sql_context).jdbc(
        url=url, table=sql_query, properties=properties, numPartitions=partition_count
    )
    data_frame.registerTempTable(spark_table)
    return data_frame

redshift_dates_sql = "select * from dates"

dates = redshift_to_spark(
    sql_query=redshift_dates_sql,
    sql_context=sql_context,
    user="******",
    host="******",
    port=******,
    database='dev',
    redshift_password=******,
    spark_table='dates'
)

Это сработало для моих коллег через терминал.Тем не менее, я сталкиваюсь с проблемами, когда я запускаю это в Zeppelin.Я получаю следующие ошибки:

Py4JJavaError: An error occurred while calling o249.jdbc.
: java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:71)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:238)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o249.jdbc.\n', JavaObject id=o251), <traceback object at 0x7fb781d01200>)

Я открыт для любых попыток.Дайте мне знать, если у вас есть идеи.

...