Я написал приведенный ниже код, который хочу прочитать из kafka, и записать файлы паркета, разделенные по годам, месяцам, дням и часам. В метке я вижу операцию сортировки (изображение ниже). Эта операция сортировки собирается сортировать внутри исполнителя или перемещать данные между исполнителями? Я не ожидал бы, что работа должна будет перетасовывать данные между исполнителями, потому что один исполнитель может читать из kafka и записывать автономный файл паркета в папку.
private val year = date_format($"timestamp", "yyyy").alias("year")
private val month = date_format($"timestamp", "MM").alias("month")
private val day = date_format($"timestamp", "dd").alias("day")
private val hour = date_format($"timestamp", "HH").alias("hour")
val source = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.load()
// transformation
.select(year, month, day, hour, $"key", $"value",
$"topic", $"partition".alias("partition_int"), $"offset", $"timestamp".cast("long").alias("timestamp_ms"),
$"timestampType".alias("timestamp_type"))
.writeStream
.trigger(Trigger.ProcessingTime(1000 * 60 * 3))
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF
.write
.mode("append")
.partitionBy("year", "month", "day", "hour")
.parquet(tableRoot)
}).option("checkpointLocation", checkpointLocation)
.start().awaitTermination()