Итерация Spark Dataframe работает медленно - PullRequest
0 голосов
/ 08 мая 2018

Я хотел бы проверить данные существующего столбца и создать новый столбец на основе определенных условий.

Проблема: у меня есть набор данных около 500 столбцов и 9K строк (9000). По моей логике, если один из столбцов имеет какое-либо нулевое значение, то создайте новый столбец относительно этого столбца и установите нулевые значения исходного столбца в 1 и оставьте 0.

Но ниже простой код занимает несколько часов, хотя мои данные невелики.

dataset_.schema.fields.map(c => {
  if(dataset_.filter(col(c.name).isNull).count() > 0)
  {
    dataset_ = dataset_.withColumn(c.name + "_isNull", when(col(c.name).isNull, 1).otherwise(0))
  }
})

Пожалуйста, помогите мне оптимизировать мой код или предоставьте обратную связь, чтобы добиться этого с помощью разностного подхода.

Примечание: я пробовал то же самое на большом кластере (искровая пряжа). Кластер Google Dataproc (3 рабочих узла, тип компьютера 32 vCPU, память 280 ГБ)

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

Вычислить все значения одновременно:

val convert = df.select(
  df.columns.map(c => (count(c) =!= count("*")).alias(c)): _*
).first.getValuesMap[Boolean](df.columns)

и использовать результат для добавления столбцов

convert.collect { case (c, true) => c }.foldLeft(df) {
  (df, c) => df.withColumn(s"${c}_isNull", when(col(c).isNull, 1).otherwise(0))
}
0 голосов
/ 08 мая 2018

Я попробовал несколько вещей ...
1) Попробуйте кэшировать при создании dataframe из файла csv или любого другого источника

2) Кроме того, если это не влияет на логику, мы можем попробовать изменить это if(dataset_.filter(col(c.name).isNull).count() > 0) до if(flightData.filter(col(x.name).isNull).take(1) != null )
Вместо того, чтобы считать все данные, мы можем просто проверить, было ли какое-либо имя столбца нулевым или нет Поскольку take(1) будет двигаться дальше, как только он найдет хотя бы одну запись, тогда как .count() продолжит работу и затем сравнит ее с 0

3) Более того, согласно текущей логике, мы можем изменить map на foreach. Однако это не повлияет на производительность, но в идеале должно быть foreach.

Я пробовал это на наборе данных, имеющем 16 столбцов и около 10 записей Lakh. Прошло 33 с после применения всех этих.

Вот снимок экрана Spark UI! enter image description here

Поскольку у вас есть 500 столбцов, время выполнения должно уменьшиться в массовом масштабе при применении их по сравнению с моим набором данных.
Надеюсь, это поможет!

...