У меня есть долгосрочное задание на структурированную потоковую передачу Spark, запущенное в Google Cloud Dataproc, в котором Kafka используется как источник и приемник. Я также сохраняю свои контрольные точки в облачном хранилище Google.
После работы в течение недели я заметил, что она постоянно использует все дисковое хранилище объемом 100 ГБ, сохраняя файлы в /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...
.
* 1005. * Насколько я понимаю, моя работа Spark не должна зависеть от локального дискового хранилища.
Я совершенно не понимаю этого здесь?
Я отправил свою работу так:
(cd app/src/packages/ && zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
--cluster cluster-26aa \
--region us-central1 \
--properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
--py-files build/mypkg.zip \
--max-failures-per-hour 10 \
--verbosity info \
app/src/explode_rmq.py
Это важные части моей работы:
Источник:
spark = SparkSession \
.builder \
.appName("MyApp") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile('mypkg.zip')
df = spark \
.readStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("subscribe", "lsport-rmq-12") \
.option("startingOffsets", "earliest") \
.load() \
.select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))
Раковина:
sink_kafka_q = sink_df \
.writeStream \
.format("kafka") \
.options(**config.KAFKA_PARAMS) \
.option("topic", "my_topic") \
.option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
.start()