У нас есть большая таблица клиентов с 7 миллионами записей, и мы пытаемся обработать некоторые данные транзакции (500K сообщений на пакет), поступающие из потока kafka.
Во время обработки нам нужно присоединиться к транзакцииданные с данными клиента.В настоящее время это занимает около 10 секунд, и требуется снизить его до 5 секунд.Поскольку таблица клиентов слишком велика, мы не можем использовать широковещательное соединение.Есть ли какая-либо другая оптимизация, которую мы можем сделать?
== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
:- Subquery custProfile
: +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- Subquery jz_view_sub_cust_profile
: +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
:- Subquery custProfile
: +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- Subquery jz_view_sub_cust_profile
: +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Project
+- Join Inner, Some((custId#110 = rowkey#0))
:- Project [rowkey#0]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- Project [custId#110]
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#119L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#122L])
+- Project
+- SortMergeJoin [rowkey#0], [custId#110]
:- Sort [rowkey#0 ASC], false, 0
: +- TungstenExchange hashpartitioning(rowkey#0,200), None
: +- Project [rowkey#0]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- HiveTableScan [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4], MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- Sort [custId#110 ASC], false, 0
+- TungstenExchange hashpartitioning(custId#110,200), None
+- Project [custId#110]
+- Scan ExistingRDD[key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118]