Я применяю объединение для объединения 2 наборов данных: 1. когда у меня есть один большой набор данных и еще один маленький, я присоединяюсь к ним с помощью объединения, но это занимает слишком много времени в перетасовке, после применения широковещательного объединения также не так много улучшений, я получил
Физический план перед применением широковещательного соединения:
== Physical Plan ==
*BroadcastHashJoin [symbol#497, traderId#498, datetime#499], [ticker#523, buyerId#528, maxdate#518], Inner, BuildRight
:- *Project [symbol#497, traderId#498, datetime#499, buyorderqty#500, sellorderqty#501, position#502]
: +- *Filter ((isnotnull(datetime#499) && isnotnull(symbol#497)) && isnotnull(traderId#498))
: +- *FileScan csv [symbol#497,traderId#498,datetime#499,buyorderqty#500,sellorderqty#501,position#502] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://<hdfs://test.test.csv>, PartitionFilters: [], PushedFilters: [IsNotNull(datetime), IsNotNull(symbol), IsNotNull(traderId)], ReadSchema: struct<symbol:string,traderId:string,datetime:timestamp,buyorderqty:int,sellorderqty:int,position...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, timestamp, false]))
+- *Filter isnotnull(maxdate#518)
+- *HashAggregate(keys=[symbol#497, traderId#498], functions=[max(datetime#499)])
+- Exchange hashpartitioning(symbol#497, traderId#498, 200)
+- *HashAggregate(keys=[symbol#497, traderId#498], functions=[partial_max(datetime#499)])
+- *Project [symbol#497, traderId#498, datetime#499]
+- *Filter (isnotnull(traderId#498) && isnotnull(symbol#497))
+- *FileScan csv [symbol#497,traderId#498,datetime#499] Batched: false, Format: CSV, Location: InMemoryFileIndex
После применения широковещательного соединения:
== Physical Plan ==
*Project [maxdate#518, symbol#497, traderId#498, position#502]
+- *BroadcastHashJoin [ticker#523, buyerId#528, maxdate#518], [symbol#497, traderId#498, datetime#499], Inner, BuildRight
:- *Filter isnotnull(maxdate#518)
: +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[max(datetime#499)])
: +- Exchange hashpartitioning(symbol#497, traderId#498, 200)
: +- *HashAggregate(keys=[symbol#497, traderId#498], functions=[partial_max(datetime#499)])
: +- *Project [symbol#497, traderId#498, datetime#499]
: +- *Filter (isnotnull(traderId#498) && isnotnull(symbol#497))
: +- *FileScan csv [symbol#497,traderId#498,datetime#499] Batched: false, Format: CSV, Location: InMemoryFileIndex[ PartitionFilters: [], PushedFilters: [IsNotNull(traderId), IsNotNull(symbol)], ReadSchema: struct<symbol:string,traderId:string,datetime:timestamp>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, timestamp, true]))
+- *Project [symbol#497, traderId#498, datetime#499, position#502]
+- *Filter ((isnotnull(datetime#499) && isnotnull(symbol#497)) && isnotnull(traderId#498))
+- *FileScan csv [symbol#497,traderId#498,datetime#499,position#502] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://., PartitionFilters: []
Я соединил эти два набора данных следующим образом:
Dataset<Row> testDs=ds.join(functions.broadcast(dsgrouped), dsgrouped.col("tick").equalTo(ds.col("symbol"))
.and(dsgrouped.col("Id").equalTo(ds.col("traderId")))
.and(dsgrouped.col("maxdate").equalTo(ds.col("datetime"))));
Я выполняю это для различных итераций, так что это идет медленно, так как количество итераций увеличилось. Может кто-нибудь предложить, что нужно сделать для повышения производительности? Даже если у вас есть 2 маленьких таблицы, как к ним присоединиться?