Spark объединяет медленные итерации с использованием широковещательной переменной - PullRequest
0 голосов
/ 25 сентября 2018

Я применяю объединение для объединения 2 наборов данных: 1. когда у меня есть один большой набор данных и еще один маленький, я присоединяюсь к ним с помощью объединения, но это занимает слишком много времени в перетасовке, после применения широковещательного объединения также не так много улучшений, я получил

Физический план перед применением широковещательного соединения:

== Physical Plan ==
*BroadcastHashJoin [symbol#497, traderId#498, datetime#499], [ticker#523, buyerId#528, maxdate#518], Inner, BuildRight
:- *Project [symbol#497, traderId#498, datetime#499, buyorderqty#500, sellorderqty#501, position#502]
:  +- *Filter ((isnotnull(datetime#499) && isnotnull(symbol#497)) && isnotnull(traderId#498))
:     +- *FileScan csv [symbol#497,traderId#498,datetime#499,buyorderqty#500,sellorderqty#501,position#502] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://<hdfs://test.test.csv>, PartitionFilters: [], PushedFilters: [IsNotNull(datetime), IsNotNull(symbol), IsNotNull(traderId)], ReadSchema: struct<symbol:string,traderId:string,datetime:timestamp,buyorderqty:int,sellorderqty:int,position...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, timestamp, false]))
   +- *Filter isnotnull(maxdate#518)
      +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[max(datetime#499)])
         +- Exchange hashpartitioning(symbol#497, traderId#498, 200)
            +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[partial_max(datetime#499)])
               +- *Project [symbol#497, traderId#498, datetime#499]
                  +- *Filter (isnotnull(traderId#498) && isnotnull(symbol#497))
                     +- *FileScan csv [symbol#497,traderId#498,datetime#499] Batched: false, Format: CSV, Location: InMemoryFileIndex

После применения широковещательного соединения:

== Physical Plan ==
*Project [maxdate#518, symbol#497, traderId#498, position#502]
+- *BroadcastHashJoin [ticker#523, buyerId#528, maxdate#518], [symbol#497, traderId#498, datetime#499], Inner, BuildRight
   :- *Filter isnotnull(maxdate#518)
   :  +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[max(datetime#499)])
   :     +- Exchange hashpartitioning(symbol#497, traderId#498, 200)
   :        +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[partial_max(datetime#499)])
   :           +- *Project [symbol#497, traderId#498, datetime#499]
   :              +- *Filter (isnotnull(traderId#498) && isnotnull(symbol#497))
   :                 +- *FileScan csv [symbol#497,traderId#498,datetime#499] Batched: false, Format: CSV, Location: InMemoryFileIndex[ PartitionFilters: [], PushedFilters: [IsNotNull(traderId), IsNotNull(symbol)], ReadSchema: struct<symbol:string,traderId:string,datetime:timestamp>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, timestamp, true]))
      +- *Project [symbol#497, traderId#498, datetime#499, position#502]
         +- *Filter ((isnotnull(datetime#499) && isnotnull(symbol#497)) && isnotnull(traderId#498))
            +- *FileScan csv [symbol#497,traderId#498,datetime#499,position#502] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://., PartitionFilters: []

Я соединил эти два набора данных следующим образом:

Dataset<Row> testDs=ds.join(functions.broadcast(dsgrouped), dsgrouped.col("tick").equalTo(ds.col("symbol"))
                .and(dsgrouped.col("Id").equalTo(ds.col("traderId")))
        .and(dsgrouped.col("maxdate").equalTo(ds.col("datetime"))));

Я выполняю это для различных итераций, так что это идет медленно, так как количество итераций увеличилось. Может кто-нибудь предложить, что нужно сделать для повышения производительности? Даже если у вас есть 2 маленьких таблицы, как к ним присоединиться?

...