Список строк для распараллеливания Spark - PullRequest
0 голосов
/ 18 июня 2020

У меня есть фрейм данных df

col1, col2, date
  a1,   b1, 2020-04-01
  a2,   b2, 2020-04-02
  a3,   b3, 2020-04-03

Я хочу записывать каждую дату в свое место в s3. Я собираю date как List[String], затем l oop через каждое значение, чтобы отфильтровать df и выписать.

val dateStr = df.select(col(date)).distinct.collect().toList.map(x => x(0).toString)
dateStr.foreach { d =>
val dateModified = d.replaceAll("-","/")
inputDf
.filter(inputDf(incrementIdentifierCol) === d)
.write.parquet(s"s3://bucket/$dateModified")
}

Есть ли способ распараллелить dateStr для фильтрации по кадру данных и писать вместо того, чтобы идти по одному?

Я знаю, что могу

df.partitionBy("date").write.parquet("s3://bucket/")

, но я не хочу, чтобы местоположение было s3://bucket/date=2020-04-01. Я хочу, чтобы это было s3://bucket/2020/04/01, поэтому я собираю и запускаю foreach.

1 Ответ

0 голосов
/ 19 июня 2020

Выполнение записи в al oop - не лучший вариант, так как Spark создаст отдельный этап для каждой операции записи. Это означает, что в случае сбоя при записи будет сложно определить, какие данные были загружены, а какие не были загружены, особенно если это некоторая инкрементная загрузка данных. Поэтому я предлагаю сделать все за один этап написания.

partitionBy метод может принимать в качестве параметра более одного имени раздела. Добавьте еще три столбца (год, месяц, день) в фрейм данных, который вы хотите записать, с помощью метода withColumn.

df
    .withColumn("year", lit("2020"))
    .withColumn("month", lit("06"))
    .withColumn("day", lit("19"))
    .write
    .partitionBy("year", "month", "day")
    .parquet("s3://bucket/" )

Жестко заданные значения года, месяца, дня могут быть заменены значениями из столбца date.

...