Подключившись к Redshift через PySpark, как заставить драйверы работать? - PullRequest
4 голосов
/ 09 июля 2020

Я пытался подключиться к Redshift с помощью Pyspark и получил ошибку Failed to find data source: com.databricks.spark.redshift. Я наконец смог избавиться от этой ошибки, вручную вызвав все драйверы, но теперь я получаю новую ошибку. Что мне следует делать по-другому?

Вот мой код:

import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

jars = ["C:\\Users\\vpena\\Documents\\spark\\spark-redshift_2.11-3.0.0-preview1.jar",
        "C:\\Users\\vpena\\Documents\\spark\\RedshiftJDBC42-no-awssdk-1.2.45.1069.jar",
        "C:\\Users\\vpena\\Documents\\spark\\minimal-json-0.9.4.jar",
        "C:\\Users\\vpena\\Documents\\spark\\spark-avro_2.11-3.0.0.jar"
       ]

ss = SparkSession.builder.config("spark.jars",  ",".join(jars)).getOrCreate()
sc = ss.sparkContext
sql_context = SQLContext(sc)



df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", f"jdbc:redshift://{keys['host']}:5439/{keys['database']}?user={keys['username']}&password={keys['password']}") \
    .option("query", "SELECT * FROM data_science_lab.iap_cube_user") \
    .option("tempdir", f"s3n://{keys_boto3['aws_access_key_id']}:{keys_boto3['aws_secret_access_key']}@{bucket_name}/tempvpena") \
    .load()

Новая ошибка, которую я получаю:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<timed exec> in <module>

~\Documents\spark\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

~\Documents\spark\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~\Documents\spark\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

~\Documents\spark\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o64.load.
: java.lang.NoClassDefFoundError: scala/Product$class
    at com.databricks.spark.redshift.Parameters$MergedParameters.<init>(Parameters.scala:89)
    at com.databricks.spark.redshift.Parameters$.mergeParameters(Parameters.scala:83)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

Это правильный способ читать данные из Redshift в Pyspark? Я прочитал эту документацию, но она не слишком полезна.

Дополнительная информация: я пытаюсь запустить этот код из Jupyter Notebook, и у меня есть драйверы из здесь .

...