Первый этап = чтение файла. Из-за перераспределения (поскольку его широкое преобразование требует перетасовки), его нельзя объединить в один этап с частичным_счетом (2-й этап)
Второй этап = локальный счет (вычисление количества на раздел)
Третий этап = агрегация результатов на драйвере.
Spark-генерирование отдельной стадии за действие или широкое преобразование. Чтобы получить более подробную информацию о узких / широких преобразованиях и о том, почему широкое преобразование требует отдельного этапа, взгляните на «Широкие и узкие зависимости, High Performance Spark, Holden Karau» или в этой статье .
Давайте проверим это предположение локально. Сначала вам нужно создать набор данных:
набор данных / test-data.json
[
{ "key": 1, "value": "a" },
{ "key": 2, "value": "b" },
{ "key": 3, "value": "c" },
{ "key": 4, "value": "d" },
{ "key": 5, "value": "e" },
{ "key": 6, "value": "f" },
{ "key": 7, "value": "g" },
{ "key": 8, "value": "h" }
]
, чем запустить следующий код:
StructType schema = new StructType()
.add("key", DataTypes.IntegerType)
.add("value", DataTypes.StringType);
SparkSession session = SparkSession.builder()
.appName("sandbox")
.master("local[*]")
.getOrCreate();
session
.read()
.schema(schema)
.json("file:///C:/<you_path>/dataset")
.repartition(4) // comment on the second run
.registerTempTable("df");
session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();
Вывод будет выглядеть следующим образом:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
+- Exchange RoundRobinPartitioning(4)
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
Но если вы закомментируете / удалите строку .repartition (4), обратите внимание, что TableScan и part_count выполняются в пределах одного этапа, и вывод будет следующим:
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
PS Обратите внимание, что дополнительный каскад может оказать существенное влияние на производительность, поскольку требует дискового ввода-вывода (смотрите здесь ) и является своего рода барьером синхронизации, влияющим на распараллеливание, что означает в большинствеслучаи Spark не начнет этап 2, пока этап 1 не будет завершен. Тем не менее, если repartition
повысить уровень параллелизма, это, вероятно, того стоит.