Я не уверен, чего вы пытаетесь достичь, используя подход, который вы показали, но я чувствую, что вы, возможно, пойдете по этому пути трудным путем .Если для этого нет веской причины, часто лучше всего позволить Spark (особенно spark 2.0+), чтобы он делал свое дело.В этом случае просто обработайте все три раздела, используя одну операцию.Spark обычно хорошо управляет вашим набором данных.Он также может автоматически вводить оптимизацию, о которой вы не думали, или оптимизацию, которую он не может выполнить, если вы пытаетесь слишком сильно контролировать процесс.Сказав это, если он плохо управляет процессом, тогда вы можете начать спорить с ним, пытаясь получить больше контроля и делать больше вручную.По крайней мере, таков мой опыт.
Например, однажды у меня была сложная серия преобразований, которые добавляли больше логики к каждому шагу / DataFrame.Если бы я заставил спарк оценивать каждый промежуточный (например, запустить подсчет или шоу на промежуточных фреймах данных), я бы в конечном итоге попал в точку, где он не мог оценить один фрейм данных (т. Е. Не смог сделать подсчет) из-за недостаточногоРесурсы.Однако, если я проигнорировал это и добавил дополнительные преобразования, Spark смог перенести некоторые оптимизации на более ранние этапы (с более поздних этапов).Это означало, что последующие DataFrames (и, что важно, мой последний DataFrame) могли быть оценены правильно.Эта окончательная оценка была возможна несмотря на тот факт, что промежуточный DataFrame, который сам по себе не мог быть оценен, все еще находился в общем процессе.
Рассмотрим следующее.Я использую CSV, но он будет работать так же для паркета.
Вот мой ввод:
data
├── tag=A
│ └── data.csv
├── tag=B
│ └── data.csv
└── tag=C
└── data.csv
Вот пример одного из файлов данных (tag = A / data.csv)
id,name,amount
1,Fred,100
2,Jane,200
Вот скрипт, который распознает разделы в этой структуре (т. е. тег является одним из столбцов).
scala> val inDataDF = spark.read.option("header","true").option("inferSchema","true").csv("data")
inDataDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> inDataDF.show
+---+-------+------+---+
| id| name|amount|tag|
+---+-------+------+---+
| 31| Scott| 3100| C|
| 32|Barnaby| 3200| C|
| 20| Bill| 2000| B|
| 21| Julia| 2100| B|
| 1| Fred| 100| A|
| 2| Jane| 200| A|
+---+-------+------+---+
scala> inDataDF.printSchema
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- amount: integer (nullable = true)
|-- tag: string (nullable = true)
scala> inDataDF.write.partitionBy("tag").csv("outData")
scala>
Опять я использовал csv, а не parquet, так что выможет обойтись без опций, чтобы прочитать заголовок и вывести схему (паркет сделает это автоматически).Но кроме этого, он будет работать так же.
Выше приведена следующая структура каталогов:
outData/
├── _SUCCESS
├── tag=A
│ └── part-00002-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
├── tag=B
│ └── part-00001-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
└── tag=C
└── part-00000-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
Если вы хотите манипулировать контентом, обязательно добавьте любую операцию карты, объединение, фильтрация или что-либо еще, что вам нужно между чтением и записью.
Например, добавьте 500 к сумме:
scala> val outDataDF = inDataDF.withColumn("amount", $"amount" + 500)
outDataDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> outDataDF.show(false)
+---+-------+------+---+
|id |name |amount|tag|
+---+-------+------+---+
|31 |Scott |3600 |C |
|32 |Barnaby|3700 |C |
|20 |Bill |2500 |B |
|21 |Julia |2600 |B |
|1 |Fred |600 |A |
|2 |Jane |700 |A |
+---+-------+------+---+
Затем просто напишите outDataDF вместо inDataDF.