Я использую pyspark для чтения данных из топики Kafka c в качестве потокового фрейма данных следующим образом:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
sdf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))
sdf_ = sdf.select("parsed_value.*")
Моя цель - записать каждую из sdf_
строк как отдельно json файлов. Следующий код:
writing_sink = sdf_.writeStream \
.format("json") \
.option("path", "/Desktop/...") \
.option("checkpointLocation", "/Desktop/...") \
.start()
writing_sink.awaitTermination()
запишет несколько строк кадра данных в одном и том же json, в зависимости от размера микропакета (или, по крайней мере, это моя гипотеза). Мне нужно настроить вышеизложенное, чтобы каждая строка данных была записана в отдельный файл json.
Я также попытался использовать partitionBy('column')
, но все равно это не будет делать именно то, что мне нужно, но вместо этого создаст папки, в которых в файлах json может быть записано несколько строк (если они имеют тот же идентификатор).
Есть идеи, которые могут помочь здесь? Заранее спасибо.