Как следует из Обучающая программа AWS необходимо предоставить драйвер JDBC
wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar
После того, как этот jar был загружен и сделан доступным для команды spark-submit
, я предоставил ей следующие зависимости:
spark-submit --master yarn --deploy-mode cluster \
--jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
--packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
my_script.py
Наконец, это my_script.py
, который я передал spark-submit
from pyspark.sql import SparkSession
def foreach_batch_function(df, epoch_id):
df.write\
.format("com.databricks.spark.redshift") \
.option("aws_iam_role", my_role) \
.option("url", my_redshift_url) \
.option("user", my_redshift_user) \
.option("password", my_redshift_password) \
.option("dbtable", my_redshift_schema + "." + my_redshift_table)\
.option("tempdir", "s3://my/temp/dir") \
.mode("append")\
.save()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)
my_schema = spark.read.parquet(my_schema_file_path).schema
df = spark \
.readStream \
.schema(my_schema) \
.option("maxFilesPerTrigger", 100) \
.parquet(my_source_path)
df.writeStream \
.trigger(processingTime='30 seconds') \
.foreachBatch(foreach_batch_function)\
.option("checkpointLocation", my_checkpoint_location) \
.start(outputMode="update").awaitTermination()