Окружающая среда
- Скала
- Apache Spark: Spark 2.2.1
- EMR на AWS: emr-5.12.1
Содержание
У меня есть один большой DataFrame, как показано ниже:
val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")
JSON-файлы размером ~ 1 ТБ в s3://some_bucket
включают 5000 разделов group_id
.
Я хочу выполнить преобразование с использованием SparkSQL, и оно отличается от каждого group_id
.
Код Spark выглядит так:
// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")
// one of queries like this:
// SELECT
// col1 as userId,
// col2 as userName,
// .....
// FROM
// lakeView
// WHERE
// group_id = xxx;
val queries: Seq[String] = getGroupIdMapping
// ** Want to know better ways **
queries.par.foreach(query => {
val convertedDF: DataFrame = spark.sql(query)
convertedDF.write.save("s3://another_bucket/")
})
par
может распараллелить на Runtime.getRuntime.availableProcessors
num, и оно будет равно числу ядер драйверов.
Но это кажется странным и недостаточно эффективным, потому что это не имеет никакого отношения к парализации Спарка.
Я действительно хочу сделать что-то вроде groupBy
в scala.collection.Seq
.
Это неправильный искровой код:
df.groupBy(groupId).foreach((groupId, parDF) => {
parDF.createOrReplaceTempView("lakeView")
val convertedDF: DataFrame = spark.sql(queryByGroupId)
convertedDF.write.save("s3://another_bucket")
})