SortMergeJoin или BroadcastHashJoin? - PullRequest
0 голосов
/ 28 мая 2019

Простое val postsAndUsers = posts.join(users, $"_OwnerUserId" === users("_Id"), "inner").count завершается ошибкой со следующей ошибкой

scala> postsAndUsers.count
[Stage 2:========================================>             (397 + 36) / 527]org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#848L])
   +- *(1) Project
      +- *(1) BroadcastHashJoin [_OwnerUserId#8L], [_Id#147L], Inner, BuildLeft
         :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
         :  +- InMemoryTableScan [_OwnerUserId#8L]
         :        +- InMemoryRelation [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         :              +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user/stackoverflow/Posts.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Posts.xml),StructType(StructField(_Id,LongType,false), StructField(_PostTypeId,ByteType,false), StructField(_AcceptedAnswerId,LongType,false), StructField(_ParentId,LongType,false), StructField(_CreationDate,StringType,false), StructField(_Score,IntegerType,false), StructField(_ViewCount,IntegerType,true), StructField(_Body,StringType,false), StructField(_OwnerUserId,LongType,false), StructField(_OwnerDisplayName,StringType,true), StructField(_LastEditorUserId,StringType,true), StructField(_LastEditorDisplayName,StringType,true), StructField(_LastEditDate,StringType,true), StructField(_LastActivityDate,StringType,false), StructField(_Title,StringType,true), StructField(_Tags,StringType,true), StructField(_AnswerCount,IntegerType,true), StructField(_CommentCount,IntegerType,true), StructField(_FavoriteCount,IntegerType,true), StructField(_ClosedDate,StringType,true), StructField(_CommunityOwnedDate,StringType,true))) [_Id#0L,_PostTypeId#1,_AcceptedAnswerId#2L,_ParentId#3L,_CreationDate#4,_Score#5,_ViewCount#6,_Body#7,_OwnerUserId#8L,_OwnerDisplayName#9,_LastEditorUserId#10,_LastEditorDisplayName#11,_LastEditDate#12,_LastActivityDate#13,_Title#14,_Tags#15,_AnswerCount#16,_CommentCount#17,_FavoriteCount#18,_ClosedDate#19,_CommunityOwnedDate#20] PushedFilters: [], ReadSchema: struct<_Id:bigint,_PostTypeId:tinyint,_AcceptedAnswerId:bigint,_ParentId:bigint,_CreationDate:str...
         +- InMemoryTableScan [_Id#147L]
               +- InMemoryRelation [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user/stackoverflow/Users.xml),Map(rowtag -> row, path -> hdfs://DR01/user/stackoverflow/Users.xml),StructType(StructField(_Id,LongType,false), StructField(_Reputation,LongType,true), StructField(_CreationDate,StringType,false), StructField(_DisplayName,StringType,true), StructField(_LastAccessDate,StringType,true), StructField(_WebsiteUrl,StringType,true), StructField(_Location,StringType,true), StructField(_AboutMe,StringType,true), StructField(_Views,LongType,true), StructField(_UpVotes,LongType,true), StructField(_DownVotes,LongType,true), StructField(_ProfileImageUrl,StringType,true), StructField(_EmailHash,StringType,true), StructField(_AccountId,LongType,true))) [_Id#147L,_Reputation#148L,_CreationDate#149,_DisplayName#150,_LastAccessDate#151,_WebsiteUrl#152,_Location#153,_AboutMe#154,_Views#155L,_UpVotes#156L,_DownVotes#157L,_ProfileImageUrl#158,_EmailHash#159,_AccountId#160L] PushedFilters: [], ReadSchema: struct<_Id:bigint,_Reputation:bigint,_CreationDate:string,_DisplayName:string,_LastAccessDate:str...

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
  ... 55 elided
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
  at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
  at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
  at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:208)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:179)
  at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
  at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithoutKeys(HashAggregateExec.scala:234)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:163)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:39)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 79 more

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
        at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Когда я сделал explain(), это показывает, что SortMergeJoin используется.Мои вопросы

  1. Какой из них используется?
  2. Есть ли трансляция во время SortMegreJoin?
scala> postsAndUsers.explain()
== Physical Plan ==
*(3) SortMergeJoin [_OwnerUserId#8L], [_Id#147L], Inner
:- *(1) Sort [_OwnerUserId#8L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_OwnerUserId#8L, 200)
:     +- InMemoryTableScan [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20]
:           +- InMemoryRelation [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:                 +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user//stackoverflow/Posts.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Posts.xml),StructType(StructField(_Id,LongType,false), StructField(_PostTypeId,ByteType,false), StructField(_AcceptedAnswerId,LongType,false), StructField(_ParentId,LongType,false), StructField(_CreationDate,StringType,false), StructField(_Score,IntegerType,false), StructField(_ViewCount,IntegerType,true), StructField(_Body,StringType,false), StructField(_OwnerUserId,LongType,false), StructField(_OwnerDisplayName,StringType,true), StructField(_LastEditorUserId,StringType,true), StructField(_LastEditorDisplayName,StringType,true), StructField(_LastEditDate,StringType,true), StructField(_LastActivityDate,StringType,false), StructField(_Title,StringType,true), StructField(_Tags,StringType,true), StructField(_AnswerCount,IntegerType,true), StructField(_CommentCount,IntegerType,true), StructField(_FavoriteCount,IntegerType,true), StructField(_ClosedDate,StringType,true), StructField(_CommunityOwnedDate,StringType,true))) [_Id#0L,_PostTypeId#1,_AcceptedAnswerId#2L,_ParentId#3L,_CreationDate#4,_Score#5,_ViewCount#6,_Body#7,_OwnerUserId#8L,_OwnerDisplayName#9,_LastEditorUserId#10,_LastEditorDisplayName#11,_LastEditDate#12,_LastActivityDate#13,_Title#14,_Tags#15,_AnswerCount#16,_CommentCount#17,_FavoriteCount#18,_ClosedDate#19,_CommunityOwnedDate#20] PushedFilters: [], ReadSchema: struct<_Id:bigint,_PostTypeId:tinyint,_AcceptedAnswerId:bigint,_ParentId:bigint,_CreationDate:str...
+- *(2) Sort [_Id#147L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_Id#147L, 200)
      +- InMemoryTableScan [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L]
            +- InMemoryRelation [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user//stackoverflow/Users.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Users.xml),StructType(StructField(_Id,LongType,false), StructField(_Reputation,LongType,true), StructField(_CreationDate,StringType,false), StructField(_DisplayName,StringType,true), StructField(_LastAccessDate,StringType,true), StructField(_WebsiteUrl,StringType,true), StructField(_Location,StringType,true), StructField(_AboutMe,StringType,true), StructField(_Views,LongType,true), StructField(_UpVotes,LongType,true), StructField(_DownVotes,LongType,true), StructField(_ProfileImageUrl,StringType,true), StructField(_EmailHash,StringType,true), StructField(_AccountId,LongType,true))) [_Id#147L,_Reputation#148L,_CreationDate#149,_DisplayName#150,_LastAccessDate#151,_WebsiteUrl#152,_Location#153,_AboutMe#154,_Views#155L,_UpVotes#156L,_DownVotes#157L,_ProfileImageUrl#158,_EmailHash#159,_AccountId#160L] PushedFilters: [], ReadSchema: struct<_Id:bigint,_Reputation:bigint,_CreationDate:string,_DisplayName:string,_LastAccessDate:str...

Код

import org.apache.spark.sql.types._
import com.databricks.spark.xml._
import org.apache.spark.sql.functions._

val postSchema = StructType(
    Array(
      StructField("_Id", LongType, nullable = false),
      StructField("_PostTypeId", ByteType, nullable = false),
      StructField("_AcceptedAnswerId", LongType, nullable = false),
      StructField("_ParentId", LongType, nullable = false),
      StructField("_CreationDate", StringType, nullable = false),
      StructField("_Score", IntegerType, nullable = false),
      StructField("_ViewCount", IntegerType, nullable = true),
      StructField("_Body", StringType, nullable = false),
      StructField("_OwnerUserId", LongType, nullable = false),
      StructField("_OwnerDisplayName", StringType, nullable = true),
      StructField("_LastEditorUserId", StringType, nullable = true),
      StructField("_LastEditorDisplayName", StringType, nullable = true),
      StructField("_LastEditDate", StringType, nullable = true),
      StructField("_LastActivityDate", StringType, nullable = false),
      StructField("_Title", StringType, nullable = true),
      StructField("_Tags", StringType, nullable = true),
      StructField("_AnswerCount", IntegerType, nullable = true),
      StructField("_CommentCount", IntegerType, nullable = true),
      StructField("_FavoriteCount", IntegerType, nullable = true),
      StructField("_ClosedDate", StringType, nullable = true),
      StructField("_CommunityOwnedDate", StringType, nullable = true)
    ))

val userSchema = StructType(
    Array(
      StructField("_Id", LongType, nullable = false),
      StructField("_Reputation", LongType, nullable = true),
      StructField("_CreationDate", StringType, nullable = false),
      StructField("_DisplayName", StringType, nullable = true),
      StructField("_LastAccessDate", StringType, nullable = true),
      StructField("_WebsiteUrl", StringType, nullable = true),
      StructField("_Location", StringType, nullable = true),
      StructField("_AboutMe", StringType, nullable = true),
      StructField("_Views", LongType, nullable = true),
      StructField("_UpVotes", LongType, nullable = true),
      StructField("_Views", LongType, nullable = true),
      StructField("_UpVotes", LongType, nullable = true),
      StructField("_DownVotes", LongType, nullable = true),
      StructField("_ProfileImageUrl", StringType, nullable = true),
      StructField("_EmailHash", StringType, nullable = true),
      StructField("_AccountId", LongType, nullable = true)
    )
  )

val posts = spark.read
    .option("rowTag", "row")
    .schema(postSchema)
    .xml("hdfs://DR01/user/stackoverflow/Posts.xml")
    .cache

val users = spark.read
    .option("rowTag", "row")
    .schema(userSchema)
    .xml("hdfs://DR01/user/stackoverflow/Users.xml")
    .cache

val postsAndUsers = posts.join(users, $"_OwnerUserId" === users("_Id"), "inner")

postsAndUsers.count
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...