Сравнение двух фреймов данных в Spark (производительность) - PullRequest
0 голосов
/ 07 января 2019

Мне нужно сравнить два кадра данных в моем искровом приложении. Я прошел следующий пост. Как получить разницу между двумя фреймами данных?

Однако я не понимаю, почему подход в лучшем ответе

df1.unionAll(df2).except(df1.intersect(df2))

лучше, чем тот, о котором идет речь

df1.except(df2).union(df2.except(df1))

Кто-нибудь может объяснить? Насколько я понимаю, последний работает с двумя меньшими наборами данных, а первый работает с большим набором данных. Это потому, что последний делает отдельное как часть союза? Даже тогда, если более вероятно, что два фрейма данных имеют одинаковые записи, в последнем случае мы имеем дело с небольшим набором данных.

Ответы [ 2 ]

0 голосов
/ 07 января 2019

Давайте рассмотрим сценарий, в котором оба значения df1 и df2 (соответственно размера N и M) слишком велики, чтобы их можно было транслировать, но между df1 и df2 нет перекрытия.

Давайте назовем это результатом di. В этом случае для df1.intersect(df2) потребуется полное перемешивание из N + M строк, однако размер вывода будет равен 0. В таком случае df1.unionAll(df2).except(di) может быть выполнено как широковещательное соединение (такая оптимизация может потребовать адаптивное выполнение , если пользователь не навязывает конкретный план). Также важно отметить, что такой план не требует кэширования.

В отличие от этого стоимость df1.except(df2).union(df2.except(df1)) будет постоянной в отношении мощности пересечения.

В то же время, если d1 слишком велик для трансляции, у него уже есть разделение, совместимое с except, поэтому оставшийся запрос не должен требовать дополнительного перемешивания.

0 голосов
/ 07 января 2019

Перво-наперво - unionAll устарело с версии 2 Spark. Пожалуйста, используйте union вместо этого, как вы сделали во втором фрагменте.

Во-вторых, в ответах на вопрос, на который вы ссылаетесь, нет информации, что первый фрагмент кода лучше. Я подготовил такой сценарий. Для меня первый занял 31, а второй 18. В моем случае df1 имеет ~ 3 миллиона строк и df2 ~ 1 миллион, по 5 столбцов в каждой.

Если мы теперь проанализируем оптимизированный логический план выполнения для первого запроса:

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Aggregate [_c0#10, _c1#11, _c2#12, _c3#13, _c4#14], [cast(_c0#10 as string) AS _c0#67, cast(_c1#11 as string) AS _c1#68, cast(_c2#12 as string) AS _c2#69, cast(_c3#13 as string) AS _c3#70, cast(_c4#14 as string) AS _c4#71]
      +- Join LeftAnti, (((((_c0#10 <=> _c0#52) && (_c1#11 <=> _c1#53)) && (_c2#12 <=> _c2#54)) && (_c3#13 <=> _c3#55)) && (_c4#14 <=> _c4#56))
         :- Union
         :  :- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv
         :  +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#40]
         :     +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
         +- Aggregate [_c0#52, _c1#53, _c2#54, _c3#55, _c4#56], [_c0#52, _c1#53, _c2#54, _c3#55, _c4#56]
            +- Join LeftSemi, (((((_c0#52 <=> _c0#30) && (_c1#53 <=> _c1#31)) && (_c2#54 <=> _c2#32)) && (_c3#55 <=> _c3#33)) && (_c4#56 <=> _c4#46))
               :- Relation[_c0#52,_c1#53,_c2#54,_c3#55,_c4#56] csv
               +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#46]
                  +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv

Мы можем видеть, что Union и Join (пересечение) работают одновременно, что очень дорого, особенно Union, тогда как для второго запроса:

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Union
      :- LocalLimit 21
      :  +- Aggregate [_c0#10, _c1#11, _c2#12, _c3#13, _c4#14], [cast(_c0#10 as string) AS _c0#120, cast(_c1#11 as string) AS _c1#121, cast(_c2#12 as string) AS _c2#122, cast(_c3#13 as string) AS _c3#123, cast(_c4#14 as string) AS _c4#124]
      :     +- Join LeftAnti, (((((_c0#10 <=> _c0#30) && (_c1#11 <=> _c1#31)) && (_c2#12 <=> _c2#32)) && (_c3#13 <=> _c3#33)) && (_c4#14 <=> _c4#98))
      :        :- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv
      :        +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#98]
      :           +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
      +- LocalLimit 21
         +- Aggregate [_c0#30, _c1#31, _c2#32, _c3#33, _c4#104], [cast(_c0#30 as string) AS _c0#130, cast(_c1#31 as string) AS _c1#131, cast(_c2#32 as string) AS _c2#132, cast(_c3#33 as string) AS _c3#133, cast(_c4#104 as string) AS _c4#134]
            +- Join LeftAnti, (((((_c0#30 <=> _c0#10) && (_c1#31 <=> _c1#11)) && (_c2#32 <=> _c2#12)) && (_c3#33 <=> _c3#13)) && (_c4#104 <=> _c4#14))
               :- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#104]
               :  +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
               +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv

Одновременно работают два LeftAnti (относительные комплименты). Это занимает меньше места и является более эффективным. Это можно увидеть в SparkUI:

Первый запрос: First query Второй запрос: Second query

В первом случае этап 7 - Union является наиболее дорогостоящим, тогда как во втором случае этапы 42 и 41 (выше) относительно быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...