Зависимости Spark 2.4.0 для записи в AWS Redshift - PullRequest
0 голосов
/ 16 апреля 2019

Я изо всех сил пытаюсь найти правильную зависимость пакетов и их относительную версию для записи в базу данных Redshfit с помощью микропакета Pyspark.

Каковы правильные зависимости для достижения этой цели?

1 Ответ

0 голосов
/ 16 апреля 2019

Как следует из Обучающая программа AWS необходимо предоставить драйвер JDBC

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar

После того, как этот jar был загружен и сделан доступным для команды spark-submit, я предоставил ей следующие зависимости:

spark-submit --master yarn --deploy-mode cluster \
  --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
  --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
  my_script.py

Наконец, это my_script.py, который я передал spark-submit

from pyspark.sql import SparkSession

def foreach_batch_function(df, epoch_id):
    df.write\
        .format("com.databricks.spark.redshift") \
        .option("aws_iam_role", my_role) \
        .option("url", my_redshift_url) \
        .option("user", my_redshift_user) \
        .option("password", my_redshift_password) \
        .option("dbtable", my_redshift_schema + "." + my_redshift_table)\
        .option("tempdir", "s3://my/temp/dir") \
        .mode("append")\
        .save()

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)

my_schema = spark.read.parquet(my_schema_file_path).schema

df = spark \
    .readStream \
    .schema(my_schema) \
    .option("maxFilesPerTrigger", 100) \
    .parquet(my_source_path)

df.writeStream \
    .trigger(processingTime='30 seconds') \
    .foreachBatch(foreach_batch_function)\
    .option("checkpointLocation", my_checkpoint_location) \
    .start(outputMode="update").awaitTermination()
...