Объединение больших данных в потоковой передаче с искрой - PullRequest
0 голосов
/ 14 февраля 2019

У нас есть большая таблица клиентов с 7 миллионами записей, и мы пытаемся обработать некоторые данные транзакции (500K сообщений на пакет), поступающие из потока kafka.

Во время обработки нам нужно присоединиться к транзакцииданные с данными клиента.В настоящее время это занимает около 10 секунд, и требуется снизить его до 5 секунд.Поскольку таблица клиентов слишком велика, мы не можем использовать широковещательное соединение.Есть ли какая-либо другая оптимизация, которую мы можем сделать?

== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Project
   +- Join Inner, Some((custId#110 = rowkey#0))
      :- Project [rowkey#0]
      :  +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
      :     +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
      +- Project [custId#110]
         +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#119L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#122L])
      +- Project
         +- SortMergeJoin [rowkey#0], [custId#110]
            :- Sort [rowkey#0 ASC], false, 0
            :  +- TungstenExchange hashpartitioning(rowkey#0,200), None
            :     +- Project [rowkey#0]
            :        +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
            :           +- HiveTableScan [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4], MetastoreRelation db_localhost, ext_sub_cust_profile, None
            +- Sort [custId#110 ASC], false, 0
               +- TungstenExchange hashpartitioning(custId#110,200), None
                  +- Project [custId#110]
                     +- Scan ExistingRDD[key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118]

1 Ответ

0 голосов
/ 14 февраля 2019
  1. Предполагая, что данные клиента постоянны для мини-пакетов, разделите эти данные клиента на customerId, используя хеш-разделитель, и кешируйте их в RDD / DF.
  2. Поскольку данные транзакций поступают из Kafka, эти данные могуттакже может быть разделен на один и тот же ключ с использованием хеш-разделителя при публикации в Kafka https://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html

Это должно сократить время соединения двух наборов данных, но единственное условие - ключ раздела должен быть одинаковым в обоих наборах данных (данные транзакции и клиентданные).

...