Apache Spark 2.2: широковещательное соединение не работает, когда вы уже кешируете фрейм данных, который хотите транслировать - PullRequest
4 голосов
/ 07 июня 2019

У меня есть несколько больших фреймов данных (около 30 ГБ), называемых as и bs, относительно небольшой фрейм данных (около 500 МБ ~ 1 ГБ), называемый spp.Я пытался кэшировать spp в память, чтобы избежать многократного чтения данных из базы данных или файлов.

Но я обнаружил, что если я кеширую spp, физический план показывает, что он не будет использовать широковещательное соединение, даже если spp включенпо функции трансляции.Однако, если я отменяю spp, план показывает, что он использует широковещательное соединение.

Кто-нибудь знаком с этим?

scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]

scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]

scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(idsegment#286L, 200)
:     +- *Filter isnotnull(idsegment#286L)
:        +- HiveTableScan [idsegmentpartner#282L, ssegmentsource#287, idsegment#286L], CatalogRelation `default`.`tblcustomsegmentcore`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [idcustomsegment#281L, idsegmentpartner#282L, ssegmentpartner#283, skey#284, svalue#285, idsegment#286L, ssegmentsource#287, datecreate#288]
+- *Sort [idAdnetProductSegment#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(idAdnetProductSegment#91L, 200)
      +- *Filter isnotnull(idAdnetProductSegment#91L)
         +- InMemoryTableScan [id#87L, idPartner#88, idSegmentPartner#89, sSegmentSourceArray#90, idAdnetProductSegment#91L, idPartnerProduct#92L, idFeed#93, idGlobalProduct#94, sBrand#95, sSku#96, sOnlineID#97, sGTIN#98, sProductCategory#99, sAvailability#100, sCondition#101, sDescription#102, sImageLink#103, sLink#104, sTitle#105, sMPN#106, sPrice#107, sAgeGroup#108, sColor#109, dateExpiration#110, sGender#111, sItemGroupId#112, sGoogleProductCategory#113, sMaterial#114, sPattern#115, sProductType#116, sSalePrice#117, sSalePriceEffectiveDate#118, sShipping#119, sShippingWeight#120, sShippingSize#121, sUnmappedAttributeList#122, sStatus#123, createdBy#124, updatedBy#125, dateCreate#126, dateUpdated#127, sProductKeyName#128, sProductKeyValue#129], [isnotnull(idAdnetProductSegment#91L)]
               +- InMemoryRelation [id#87L, idPartner#88, idSegmentPartner#89, sSegmentSourceArray#90, idAdnetProductSegment#91L, idPartnerProduct#92L, idFeed#93, idGlobalProduct#94, sBrand#95, sSku#96, sOnlineID#97, sGTIN#98, sProductCategory#99, sAvailability#100, sCondition#101, sDescription#102, sImageLink#103, sLink#104, sTitle#105, sMPN#106, sPrice#107, sAgeGroup#108, sColor#109, dateExpiration#110, sGender#111, sItemGroupId#112, sGoogleProductCategory#113, sMaterial#114, sPattern#115, sProductType#116, sSalePrice#117, sSalePriceEffectiveDate#118, sShipping#119, sShippingWeight#120, sShippingSize#121, sUnmappedAttributeList#122, sStatus#123, createdBy#124, updatedBy#125, dateCreate#126, dateUpdated#127, sProductKeyName#128, sProductKeyValue#129], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *Scan JDBCRelation(tblSegmentPartnerProduct) [numPartitions=1] [id#87L,idPartner#88,idSegmentPartner#89,sSegmentSourceArray#90,idAdnetProductSegment#91L,idPartnerProduct#92L,idFeed#93,idGlobalProduct#94,sBrand#95,sSku#96,sOnlineID#97,sGTIN#98,sProductCategory#99,sAvailability#100,sCondition#101,sDescription#102,sImageLink#103,sLink#104,sTitle#105,sMPN#106,sPrice#107,sAgeGroup#108,sColor#109,dateExpiration#110,sGender#111,sItemGroupId#112,sGoogleProductCategory#113,sMaterial#114,sPattern#115,sProductType#116,sSalePrice#117,sSalePriceEffectiveDate#118,sShipping#119,sShippingWeight#120,sShippingSize#121,sUnmappedAttributeList#122,sStatus#123,createdBy#124,updatedBy#125,dateCreate#126,dateUpdated#127,sProductKeyName#128,sProductKeyValue#129] ReadSchema: struct<id:bigint,idPartner:int,idSegmentPartner:int,sSegmentSourceArray:string,idAdnetProductSegm...

scala> spp.unpersist
res40: spp.type = [id: bigint, idPartner: int ... 41 more fields]

scala> as.explain
== Physical Plan ==
*BroadcastHashJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner, BuildRight
:- *Filter isnotnull(idsegment#286L)
:  +- HiveTableScan [idsegmentpartner#282L, ssegmentsource#287, idsegment#286L], CatalogRelation `default`.`tblcustomsegmentcore`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [idcustomsegment#281L, idsegmentpartner#282L, ssegmentpartner#283, skey#284, svalue#285, idsegment#286L, ssegmentsource#287, datecreate#288]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[4, bigint, true]))
   +- *Scan JDBCRelation(tblSegmentPartnerProduct) [numPartitions=1] [id#87L,idPartner#88,idSegmentPartner#89,sSegmentSourceArray#90,idAdnetProductSegment#91L,idPartnerProduct#92L,idFeed#93,idGlobalProduct#94,sBrand#95,sSku#96,sOnlineID#97,sGTIN#98,sProductCategory#99,sAvailability#100,sCondition#101,sDescription#102,sImageLink#103,sLink#104,sTitle#105,sMPN#106,sPrice#107,sAgeGroup#108,sColor#109,dateExpiration#110,sGender#111,sItemGroupId#112,sGoogleProductCategory#113,sMaterial#114,sPattern#115,sProductType#116,sSalePrice#117,sSalePriceEffectiveDate#118,sShipping#119,sShippingWeight#120,sShippingSize#121,sUnmappedAttributeList#122,sStatus#123,createdBy#124,updatedBy#125,dateCreate#126,dateUpdated#127,sProductKeyName#128,sProductKeyValue#129] PushedFilters: [*IsNotNull(idAdnetProductSegment)], ReadSchema: struct<id:bigint,idPartner:int,idSegmentPartner:int,sSegmentSourceArray:string,idAdnetProductSegm...

1 Ответ

4 голосов
/ 08 июня 2019

Это происходит, когда анализируемый план пытается использовать данные кэша.Он проглатывает информацию ResolvedHint, предоставленную пользователем ( код ).
Если мы попытаемся сделать df.explain(true), мы увидим, что подсказка теряется между анализируемым и оптимизированным планом, гдеSpark пытается использовать кэшированные данные.Эта проблема была исправлена ​​в последней версии Spark (при нескольких попытках).
последняя версия jira: https://issues.apache.org/jira/browse/SPARK-27674.
Код исправления (чтобы учесть подсказку при использовании кэшированных таблиц): https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219

...