Как написать поток на Amazon S3, используя структурированный потоковый Pyspark? - PullRequest
0 голосов
/ 06 июня 2018

Я не получаю данные в мое ведро s3 и не получаю никакой ошибки.Я работаю с Spark 2.3.0 и Python.По сути, я пытаюсь создать Datalake, извлекающий данные из Kafka с использованием Структурированной потоковой искры, и я хотел бы записать поток в корзину S3, но не могу.Я хотел бы знать, как это сделать.Моя основная идея заключается в следующем:

query = self.query_impressions \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", self.join_path + "/applicationHistory").partitionBy("year", "month", "day", "hour") \
        .option("path", "s3a://datalake/test")

Я не знаю, нужно ли мне использовать некоторые пакеты, конфигурации, разрешения или другие вещи.У меня есть эти конфигурации:

import os
import time
import subprocess
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
  --conf spark.hadoop.fs.s3a.endpoint=s3.eu-central-1.amazonaws.com \
  --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
  --conf spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true \ 
  --conf spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true \
  --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf spark.hadoop.fs.s3a.access.key="xxx" \ 
  --conf spark.hadoop.fs.s3a.secret.key="yyy" \
  pyspark-shell'
...