У меня есть несколько больших фреймов данных (около 30 ГБ), называемых as и bs, относительно небольшой фрейм данных (около 500 МБ ~ 1 ГБ), называемый spp.Я пытался кэшировать spp в память, чтобы избежать многократного чтения данных из базы данных или файлов.
Но я обнаружил, что если я кеширую spp, физический план показывает, что он не будет использовать широковещательное соединение, даже если spp включенпо функции трансляции.Однако, если я отменяю spp, план показывает, что он использует широковещательное соединение.
Кто-нибудь знаком с этим?
scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]
scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]
scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(idsegment#286L, 200)
: +- *Filter isnotnull(idsegment#286L)
: +- HiveTableScan [idsegmentpartner#282L, ssegmentsource#287, idsegment#286L], CatalogRelation `default`.`tblcustomsegmentcore`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [idcustomsegment#281L, idsegmentpartner#282L, ssegmentpartner#283, skey#284, svalue#285, idsegment#286L, ssegmentsource#287, datecreate#288]
+- *Sort [idAdnetProductSegment#91L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(idAdnetProductSegment#91L, 200)
+- *Filter isnotnull(idAdnetProductSegment#91L)
+- InMemoryTableScan [id#87L, idPartner#88, idSegmentPartner#89, sSegmentSourceArray#90, idAdnetProductSegment#91L, idPartnerProduct#92L, idFeed#93, idGlobalProduct#94, sBrand#95, sSku#96, sOnlineID#97, sGTIN#98, sProductCategory#99, sAvailability#100, sCondition#101, sDescription#102, sImageLink#103, sLink#104, sTitle#105, sMPN#106, sPrice#107, sAgeGroup#108, sColor#109, dateExpiration#110, sGender#111, sItemGroupId#112, sGoogleProductCategory#113, sMaterial#114, sPattern#115, sProductType#116, sSalePrice#117, sSalePriceEffectiveDate#118, sShipping#119, sShippingWeight#120, sShippingSize#121, sUnmappedAttributeList#122, sStatus#123, createdBy#124, updatedBy#125, dateCreate#126, dateUpdated#127, sProductKeyName#128, sProductKeyValue#129], [isnotnull(idAdnetProductSegment#91L)]
+- InMemoryRelation [id#87L, idPartner#88, idSegmentPartner#89, sSegmentSourceArray#90, idAdnetProductSegment#91L, idPartnerProduct#92L, idFeed#93, idGlobalProduct#94, sBrand#95, sSku#96, sOnlineID#97, sGTIN#98, sProductCategory#99, sAvailability#100, sCondition#101, sDescription#102, sImageLink#103, sLink#104, sTitle#105, sMPN#106, sPrice#107, sAgeGroup#108, sColor#109, dateExpiration#110, sGender#111, sItemGroupId#112, sGoogleProductCategory#113, sMaterial#114, sPattern#115, sProductType#116, sSalePrice#117, sSalePriceEffectiveDate#118, sShipping#119, sShippingWeight#120, sShippingSize#121, sUnmappedAttributeList#122, sStatus#123, createdBy#124, updatedBy#125, dateCreate#126, dateUpdated#127, sProductKeyName#128, sProductKeyValue#129], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(tblSegmentPartnerProduct) [numPartitions=1] [id#87L,idPartner#88,idSegmentPartner#89,sSegmentSourceArray#90,idAdnetProductSegment#91L,idPartnerProduct#92L,idFeed#93,idGlobalProduct#94,sBrand#95,sSku#96,sOnlineID#97,sGTIN#98,sProductCategory#99,sAvailability#100,sCondition#101,sDescription#102,sImageLink#103,sLink#104,sTitle#105,sMPN#106,sPrice#107,sAgeGroup#108,sColor#109,dateExpiration#110,sGender#111,sItemGroupId#112,sGoogleProductCategory#113,sMaterial#114,sPattern#115,sProductType#116,sSalePrice#117,sSalePriceEffectiveDate#118,sShipping#119,sShippingWeight#120,sShippingSize#121,sUnmappedAttributeList#122,sStatus#123,createdBy#124,updatedBy#125,dateCreate#126,dateUpdated#127,sProductKeyName#128,sProductKeyValue#129] ReadSchema: struct<id:bigint,idPartner:int,idSegmentPartner:int,sSegmentSourceArray:string,idAdnetProductSegm...
scala> spp.unpersist
res40: spp.type = [id: bigint, idPartner: int ... 41 more fields]
scala> as.explain
== Physical Plan ==
*BroadcastHashJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner, BuildRight
:- *Filter isnotnull(idsegment#286L)
: +- HiveTableScan [idsegmentpartner#282L, ssegmentsource#287, idsegment#286L], CatalogRelation `default`.`tblcustomsegmentcore`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [idcustomsegment#281L, idsegmentpartner#282L, ssegmentpartner#283, skey#284, svalue#285, idsegment#286L, ssegmentsource#287, datecreate#288]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[4, bigint, true]))
+- *Scan JDBCRelation(tblSegmentPartnerProduct) [numPartitions=1] [id#87L,idPartner#88,idSegmentPartner#89,sSegmentSourceArray#90,idAdnetProductSegment#91L,idPartnerProduct#92L,idFeed#93,idGlobalProduct#94,sBrand#95,sSku#96,sOnlineID#97,sGTIN#98,sProductCategory#99,sAvailability#100,sCondition#101,sDescription#102,sImageLink#103,sLink#104,sTitle#105,sMPN#106,sPrice#107,sAgeGroup#108,sColor#109,dateExpiration#110,sGender#111,sItemGroupId#112,sGoogleProductCategory#113,sMaterial#114,sPattern#115,sProductType#116,sSalePrice#117,sSalePriceEffectiveDate#118,sShipping#119,sShippingWeight#120,sShippingSize#121,sUnmappedAttributeList#122,sStatus#123,createdBy#124,updatedBy#125,dateCreate#126,dateUpdated#127,sProductKeyName#128,sProductKeyValue#129] PushedFilters: [*IsNotNull(idAdnetProductSegment)], ReadSchema: struct<id:bigint,idPartner:int,idSegmentPartner:int,sSegmentSourceArray:string,idAdnetProductSegm...