Эффективный способ присоединиться и объединиться в искру и минимизировать тасование - PullRequest
0 голосов
/ 06 июля 2018

У меня есть два больших фрейма данных с примерно несколькими миллионами записей в каждом.

val df1 = Seq(
 ("k1a","k2a", "g1x","g2x")
,("k1b","k2b", "g1x","g2x")
,("k1c","k2c", "g1x","g2y")
,("k1d","k2d", "g1y","g2y")
,("k1e","k2e", "g1y","g2y")
,("k1f","k2f", "g1z","g2y")
).toDF("key1", "key2", "grp1","grp2")

val df2 = Seq(
 ("k1a","k2a", "v4a")
,("k1b","k2b", "v4b")
,("k1c","k2c", "v4c")
,("k1d","k2d", "v4d")
,("k1e","k2e", "v4e")
,("k1f","k2f", "v4f")
).toDF("key1", "key2", "fld4")

Я пытаюсь присоединиться и выполнить groupBy, как показано ниже, но это всегда требует результата. В df1 есть около миллиона уникальных экземпляров данных grp1 + grp2.

val df3 = df1.join(df2,Seq("key1","key2"))
val df4 = df3.groupBy("grp1","grp2").agg(collect_list(struct($"key1",$"key2")).as("dups")).filter("size(dups)>1")

Есть ли способ уменьшить тасование? Является ли mapPartitions правильным подходом для этих двух сценариев? Может кто-нибудь предложить эффективный способ с примером.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...