Неожиданный сорт в искровом даге - PullRequest
1 голос
/ 01 мая 2020

Я написал приведенный ниже код, который хочу прочитать из kafka, и записать файлы паркета, разделенные по годам, месяцам, дням и часам. В метке я вижу операцию сортировки (изображение ниже). Эта операция сортировки собирается сортировать внутри исполнителя или перемещать данные между исполнителями? Я не ожидал бы, что работа должна будет перетасовывать данные между исполнителями, потому что один исполнитель может читать из kafka и записывать автономный файл паркета в папку.

     private val year = date_format($"timestamp", "yyyy").alias("year")
     private val month = date_format($"timestamp", "MM").alias("month")
     private val day = date_format($"timestamp", "dd").alias("day")
     private val hour = date_format($"timestamp", "HH").alias("hour")
     val source =  spark.readStream
      .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
      .load()
    // transformation
     .select(year, month, day, hour, $"key", $"value",
      $"topic", $"partition".alias("partition_int"), $"offset", $"timestamp".cast("long").alias("timestamp_ms"),
      $"timestampType".alias("timestamp_type"))
     .writeStream
     .trigger(Trigger.ProcessingTime(1000 * 60 * 3))
     .foreachBatch((batchDF: DataFrame, batchId: Long) => {
                batchDF
                   .write
                   .mode("append")
                   .partitionBy("year", "month", "day", "hour")
                   .parquet(tableRoot)
     }).option("checkpointLocation", checkpointLocation)
     .start().awaitTermination()

DAG with sort operation

1 Ответ

1 голос
/ 01 мая 2020

Это просто сортировка по заданию. Никаких перестановок по разделам.

...