Как распараллелить spark.read.parquet ()? - PullRequest
0 голосов
/ 11 февраля 2020

Моя работа Spark считывает папку с паркетными данными, разделенными столбцом раздел :

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .coalesce(1)
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

Я заметил, что создается только одна задача, поэтому она очень медленная. Внутри исходной папки находится множество подпапок, таких как partition = 2019-01-07 , и каждая подпапка содержит множество файлов с расширением snappy.parquet . Я отправляю задание - num-executors 2 --executor-cores 4 , и оперативная память не является проблемой. Я пытался читать как из S3, так и из локальной файловой системы. Я попытался добавить .repartition (nPartitions) , удалив .coalesce (1) и .partitionBy ("date") , но то же самое.

Не могли бы вы предложить, как я могу заставить Spark читать эти файлы паркета параллельно?

1 Ответ

0 голосов
/ 12 февраля 2020

Ну, я понял правильный код:

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .option("mergeSchema", "true")
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

Надеюсь, это сэкономит кому-то время в будущем.

...