Почему действие по подсчету искры выполнено в три этапа - PullRequest
1 голос
/ 06 ноября 2019

Я загрузил CSV-файл. Повторно разделил его на 4, а затем принял к сведению DataFrame. И когда я посмотрел на DAG, я увидел, что это действие выполняется в 3 этапа.

enter image description here

Почему это простое действие выполняется в 3 этапа. Я полагаю, 1-й этап - загрузка файла, а 2-й - поиск счетчика для каждого раздела.

Итак, что происходит на 3-м этапе?

Вот мой код

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")

sample.repartition(4).count()

1 Ответ

2 голосов
/ 06 ноября 2019
  1. Первый этап = чтение файла. Из-за перераспределения (поскольку его широкое преобразование требует перетасовки), его нельзя объединить в один этап с частичным_счетом (2-й этап)

  2. Второй этап = локальный счет (вычисление количества на раздел)

  3. Третий этап = агрегация результатов на драйвере.

Spark-генерирование отдельной стадии за действие или широкое преобразование. Чтобы получить более подробную информацию о узких / широких преобразованиях и о том, почему широкое преобразование требует отдельного этапа, взгляните на «Широкие и узкие зависимости, High Performance Spark, Holden Karau» или в этой статье .

Давайте проверим это предположение локально. Сначала вам нужно создать набор данных:

набор данных / test-data.json

[
  { "key":  1, "value":  "a" },
  { "key":  2, "value":  "b" },
  { "key":  3, "value":  "c" },
  { "key":  4, "value":  "d" },
  { "key":  5, "value":  "e" },
  { "key":  6, "value":  "f" },
  { "key":  7, "value":  "g" },
  { "key":  8, "value":  "h" }
]

, чем запустить следующий код:

    StructType schema = new StructType()
            .add("key", DataTypes.IntegerType)
            .add("value", DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

Вывод будет выглядеть следующим образом:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Но если вы закомментируете / удалите строку .repartition (4), обратите внимание, что TableScan и part_count выполняются в пределах одного этапа, и вывод будет следующим:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

PS Обратите внимание, что дополнительный каскад может оказать существенное влияние на производительность, поскольку требует дискового ввода-вывода (смотрите здесь ) и является своего рода барьером синхронизации, влияющим на распараллеливание, что означает в большинствеслучаи Spark не начнет этап 2, пока этап 1 не будет завершен. Тем не менее, если repartition повысить уровень параллелизма, это, вероятно, того стоит.

...