Вот пример паркетной мойки:
# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
.writeStream
.format("parquet") # can be "orc", "json", "csv", etc.
.outputMode("append") # can only be "append"
.option("path", "path/to/destination/dir")
.partitionBy("col") # if you need to partition
.trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
.option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
.start()
targetParquetHDFS.awaitTermination()
Для более подробной информации:
Интеграция Kafka: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Руководство по программированию SS: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
добавлено
Хорошо ... Я добавил кое-что к ответу, чтобы прояснить ваш вопрос.
У SS есть несколько различных типов триггеров :
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
по умолчанию : следующий триггер происходит после завершения обработки предыдущего триггера
фиксированные интервалы : .trigger(processingTime='10 seconds')
, поэтому сработает 10-секундный триггер в 00:10, 00:20, 00: 30
единовременно : обрабатывает все доступные данные водин раз .trigger(once=True)
непрерывный / фиксированный интервал контрольной точки => лучше всего видеть руководство по программированию doc
Поэтому в вашем примере Kafka SS может обрабатывать данные о времени событияотметка времени в микропакетах с помощью триггеров « default » или « фиксированный интервал » или « однократная » обработка всех доступных данныхв теме исходного кода Кафки.