pyspark writeStream: каждая строка фрейма данных в отдельном файле json - PullRequest
0 голосов
/ 12 марта 2020

Я использую pyspark для чтения данных из топики Kafka c в качестве потокового фрейма данных следующим образом:

spark = SparkSession.builder \
  .appName("Spark Structured Streaming from Kafka") \
  .getOrCreate()

sdf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load() \
  .select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))

sdf_ = sdf.select("parsed_value.*")

Моя цель - записать каждую из sdf_ строк как отдельно json файлов. Следующий код:

writing_sink = sdf_.writeStream \
    .format("json") \
    .option("path", "/Desktop/...") \
    .option("checkpointLocation", "/Desktop/...") \
    .start()

writing_sink.awaitTermination()

запишет несколько строк кадра данных в одном и том же json, в зависимости от размера микропакета (или, по крайней мере, это моя гипотеза). Мне нужно настроить вышеизложенное, чтобы каждая строка данных была записана в отдельный файл json.

Я также попытался использовать partitionBy('column'), но все равно это не будет делать именно то, что мне нужно, но вместо этого создаст папки, в которых в файлах json может быть записано несколько строк (если они имеют тот же идентификатор).

Есть идеи, которые могут помочь здесь? Заранее спасибо.

1 Ответ

1 голос
/ 13 марта 2020

Обнаружил, что следующий вариант делает трюк:

   .option("maxRecordsPerFile", 1)
...