Как обновить фрейм данных Spark на основе столбца из другого фрейма данных со многими записями в Scala? - PullRequest
2 голосов
/ 08 мая 2019

Я работаю с фреймами данных Spark и хочу обновить столбец column_to_be_updated в таблице кустов, используя spark-sql в Scala.

Мой код до сих пор работает с небольшими фреймами данных:

var data_frame = spark.sql("Select ... From TableXX")

var id_list = spark.sql("Select Id From TableXY Where ...")..collect().map(_(0)).toList

data_frame.withColumn("column_to_be_updated", when($"other_column_of_frame".isin(id_list:_*), 1)
    .otherwise($"column_to_be_updated"))

Я хочу обновить столбец column_to_be_updated, если запись в other_column-of_frame находится в столбце id TableXY.Мой обходной путь заключается в том, чтобы сначала преобразовать столбец id в список, а затем использовать .isin -statement.

Однако у меня есть много строк в TableXY и TableXX, так что, похоже, происходит сбойи перегрузить id_list.Есть ли другое решение или более эффективное решение для того, чего я пытаюсь достичь?

Заранее спасибо!

1 Ответ

2 голосов
/ 08 мая 2019

Вы можете объединить кадры данных, используя внешнее левое соединение. Таким образом, столбец Id можно добавить к data_frame в строках, где other_column_of_frame находится в списке идентификаторов. Затем просто проверьте, является ли добавленный столбец Id нулевым или нет.

val ids = spark.sql("Select Id From TableXY Where ...")
val updated = data_frame
  .join(broadcast(ids), ids.col("Id") === data_frame.col("other_column_of_frame"), "left_outer")
  .withColumn("column_to_be_updated", when($"Id".isNotNull, 1).otherwise($"column_to_be_updated"))
  .drop("Id")

Вы можете прочитать о broadcast здесь: Оптимизация объединения DataFrame - Broadcast Hash Join

...