Есть ли альтернатива сделать итеративное объединение в spark - scala - PullRequest
1 голос
/ 05 ноября 2019

Вариант использования - найти n максимум строк в данном столбце (это может быть n номеров столбцов), и когда у вас есть n ключей, вы присоединяете его к исходному набору данных, чтобы получить все необходимые вам строки

val df = Seq (("12", "Tom", "Hanks"), ("13", "Meryl", "Streep"), ("12", "Tom", "Hardy"), ("12", "Джон", "Стрип")) .toDF ("возраст", "имя", "фамилия")

Позволяет сказать, что я хочу присоединиться к каждому столбцу индивидуально с набором данных более крупных актерову которого есть все три столбца выше.

val v1 = actors.join(df, Seq("id"), "inner")
val v2 =actors.join(df, Seq("firstname"), "inner")
val v3 =actors.join(df, Seq("lastname"), "inner")
val output = v1.union(v2).union(v3)

Есть ли способ не делать это итеративно? Кроме того, потому что столбцы, которые будут объединены, могут быть динамическими. например, иногда это может быть только идентификатор или только идентификатор и имя.

Ответы [ 3 ]

2 голосов
/ 05 ноября 2019

Вы можете попробовать другой подход, так что вы можете добиться этого следующим образом:

actors.join(df).where(
actors("id") === df("id") || 
actors("firstname") === df("firstname") || 
actors("lastname") === df("lastname")
)

и для n-столбцов вы можете попробовать это:

  val joinCols = Seq("id", "firstname", "lastname") // or actors.columns
  val condition = joinCols
    .map(s => (actors(s) === df(s)))
    .reduce((a, b) => a || b)

вы получите ниже условия:

condition.explain(true)
(((a#7 = a#7) || (b#8 = b#8)) || (c#9 = c#9))

и, наконец, используйте его:

   actors.join(df).where(condition)
0 голосов
/ 05 ноября 2019

@ решение chlebek должно работать идеально, это еще один подход в случае, если вы хотите воспроизвести свою первоначальную логику:

val cols = Seq("id", "firstname", "lastname")

val final_df = cols.map{
     df.join(actors, Seq(_), "inner") 
}
.reduce(_ union _)

Сначала мы генерируем одно внутреннее соединение застолбец, то мы союз их.

0 голосов
/ 05 ноября 2019

Я думаю, что решение проблемы с трансляцией с меньшим набором данных и udf для проверки с большим набором данных. я продолжал думать в терминах объединений!

...