Простое val postsAndUsers = posts.join(users, $"_OwnerUserId" === users("_Id"), "inner").count
завершается ошибкой со следующей ошибкой
scala> postsAndUsers.count
[Stage 2:========================================> (397 + 36) / 527]org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#848L])
+- *(1) Project
+- *(1) BroadcastHashJoin [_OwnerUserId#8L], [_Id#147L], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- InMemoryTableScan [_OwnerUserId#8L]
: +- InMemoryRelation [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user/stackoverflow/Posts.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Posts.xml),StructType(StructField(_Id,LongType,false), StructField(_PostTypeId,ByteType,false), StructField(_AcceptedAnswerId,LongType,false), StructField(_ParentId,LongType,false), StructField(_CreationDate,StringType,false), StructField(_Score,IntegerType,false), StructField(_ViewCount,IntegerType,true), StructField(_Body,StringType,false), StructField(_OwnerUserId,LongType,false), StructField(_OwnerDisplayName,StringType,true), StructField(_LastEditorUserId,StringType,true), StructField(_LastEditorDisplayName,StringType,true), StructField(_LastEditDate,StringType,true), StructField(_LastActivityDate,StringType,false), StructField(_Title,StringType,true), StructField(_Tags,StringType,true), StructField(_AnswerCount,IntegerType,true), StructField(_CommentCount,IntegerType,true), StructField(_FavoriteCount,IntegerType,true), StructField(_ClosedDate,StringType,true), StructField(_CommunityOwnedDate,StringType,true))) [_Id#0L,_PostTypeId#1,_AcceptedAnswerId#2L,_ParentId#3L,_CreationDate#4,_Score#5,_ViewCount#6,_Body#7,_OwnerUserId#8L,_OwnerDisplayName#9,_LastEditorUserId#10,_LastEditorDisplayName#11,_LastEditDate#12,_LastActivityDate#13,_Title#14,_Tags#15,_AnswerCount#16,_CommentCount#17,_FavoriteCount#18,_ClosedDate#19,_CommunityOwnedDate#20] PushedFilters: [], ReadSchema: struct<_Id:bigint,_PostTypeId:tinyint,_AcceptedAnswerId:bigint,_ParentId:bigint,_CreationDate:str...
+- InMemoryTableScan [_Id#147L]
+- InMemoryRelation [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user/stackoverflow/Users.xml),Map(rowtag -> row, path -> hdfs://DR01/user/stackoverflow/Users.xml),StructType(StructField(_Id,LongType,false), StructField(_Reputation,LongType,true), StructField(_CreationDate,StringType,false), StructField(_DisplayName,StringType,true), StructField(_LastAccessDate,StringType,true), StructField(_WebsiteUrl,StringType,true), StructField(_Location,StringType,true), StructField(_AboutMe,StringType,true), StructField(_Views,LongType,true), StructField(_UpVotes,LongType,true), StructField(_DownVotes,LongType,true), StructField(_ProfileImageUrl,StringType,true), StructField(_EmailHash,StringType,true), StructField(_AccountId,LongType,true))) [_Id#147L,_Reputation#148L,_CreationDate#149,_DisplayName#150,_LastAccessDate#151,_WebsiteUrl#152,_Location#153,_AboutMe#154,_Views#155L,_UpVotes#156L,_DownVotes#157L,_ProfileImageUrl#158,_EmailHash#159,_AccountId#160L] PushedFilters: [], ReadSchema: struct<_Id:bigint,_Reputation:bigint,_CreationDate:string,_DisplayName:string,_LastAccessDate:str...
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
... 55 elided
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:208)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:179)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithoutKeys(HashAggregateExec.scala:234)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:163)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:39)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 79 more
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Когда я сделал explain()
, это показывает, что SortMergeJoin используется.Мои вопросы
- Какой из них используется?
- Есть ли трансляция во время SortMegreJoin?
scala> postsAndUsers.explain()
== Physical Plan ==
*(3) SortMergeJoin [_OwnerUserId#8L], [_Id#147L], Inner
:- *(1) Sort [_OwnerUserId#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(_OwnerUserId#8L, 200)
: +- InMemoryTableScan [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20]
: +- InMemoryRelation [_Id#0L, _PostTypeId#1, _AcceptedAnswerId#2L, _ParentId#3L, _CreationDate#4, _Score#5, _ViewCount#6, _Body#7, _OwnerUserId#8L, _OwnerDisplayName#9, _LastEditorUserId#10, _LastEditorDisplayName#11, _LastEditDate#12, _LastActivityDate#13, _Title#14, _Tags#15, _AnswerCount#16, _CommentCount#17, _FavoriteCount#18, _ClosedDate#19, _CommunityOwnedDate#20], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user//stackoverflow/Posts.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Posts.xml),StructType(StructField(_Id,LongType,false), StructField(_PostTypeId,ByteType,false), StructField(_AcceptedAnswerId,LongType,false), StructField(_ParentId,LongType,false), StructField(_CreationDate,StringType,false), StructField(_Score,IntegerType,false), StructField(_ViewCount,IntegerType,true), StructField(_Body,StringType,false), StructField(_OwnerUserId,LongType,false), StructField(_OwnerDisplayName,StringType,true), StructField(_LastEditorUserId,StringType,true), StructField(_LastEditorDisplayName,StringType,true), StructField(_LastEditDate,StringType,true), StructField(_LastActivityDate,StringType,false), StructField(_Title,StringType,true), StructField(_Tags,StringType,true), StructField(_AnswerCount,IntegerType,true), StructField(_CommentCount,IntegerType,true), StructField(_FavoriteCount,IntegerType,true), StructField(_ClosedDate,StringType,true), StructField(_CommunityOwnedDate,StringType,true))) [_Id#0L,_PostTypeId#1,_AcceptedAnswerId#2L,_ParentId#3L,_CreationDate#4,_Score#5,_ViewCount#6,_Body#7,_OwnerUserId#8L,_OwnerDisplayName#9,_LastEditorUserId#10,_LastEditorDisplayName#11,_LastEditDate#12,_LastActivityDate#13,_Title#14,_Tags#15,_AnswerCount#16,_CommentCount#17,_FavoriteCount#18,_ClosedDate#19,_CommunityOwnedDate#20] PushedFilters: [], ReadSchema: struct<_Id:bigint,_PostTypeId:tinyint,_AcceptedAnswerId:bigint,_ParentId:bigint,_CreationDate:str...
+- *(2) Sort [_Id#147L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_Id#147L, 200)
+- InMemoryTableScan [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L]
+- InMemoryRelation [_Id#147L, _Reputation#148L, _CreationDate#149, _DisplayName#150, _LastAccessDate#151, _WebsiteUrl#152, _Location#153, _AboutMe#154, _Views#155L, _UpVotes#156L, _DownVotes#157L, _ProfileImageUrl#158, _EmailHash#159, _AccountId#160L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan XmlRelation(<function0>,Some(hdfs://DR01/user//stackoverflow/Users.xml),Map(rowtag -> row, path -> hdfs://DR01/user/ay185050/stackoverflow/Users.xml),StructType(StructField(_Id,LongType,false), StructField(_Reputation,LongType,true), StructField(_CreationDate,StringType,false), StructField(_DisplayName,StringType,true), StructField(_LastAccessDate,StringType,true), StructField(_WebsiteUrl,StringType,true), StructField(_Location,StringType,true), StructField(_AboutMe,StringType,true), StructField(_Views,LongType,true), StructField(_UpVotes,LongType,true), StructField(_DownVotes,LongType,true), StructField(_ProfileImageUrl,StringType,true), StructField(_EmailHash,StringType,true), StructField(_AccountId,LongType,true))) [_Id#147L,_Reputation#148L,_CreationDate#149,_DisplayName#150,_LastAccessDate#151,_WebsiteUrl#152,_Location#153,_AboutMe#154,_Views#155L,_UpVotes#156L,_DownVotes#157L,_ProfileImageUrl#158,_EmailHash#159,_AccountId#160L] PushedFilters: [], ReadSchema: struct<_Id:bigint,_Reputation:bigint,_CreationDate:string,_DisplayName:string,_LastAccessDate:str...
Код
import org.apache.spark.sql.types._
import com.databricks.spark.xml._
import org.apache.spark.sql.functions._
val postSchema = StructType(
Array(
StructField("_Id", LongType, nullable = false),
StructField("_PostTypeId", ByteType, nullable = false),
StructField("_AcceptedAnswerId", LongType, nullable = false),
StructField("_ParentId", LongType, nullable = false),
StructField("_CreationDate", StringType, nullable = false),
StructField("_Score", IntegerType, nullable = false),
StructField("_ViewCount", IntegerType, nullable = true),
StructField("_Body", StringType, nullable = false),
StructField("_OwnerUserId", LongType, nullable = false),
StructField("_OwnerDisplayName", StringType, nullable = true),
StructField("_LastEditorUserId", StringType, nullable = true),
StructField("_LastEditorDisplayName", StringType, nullable = true),
StructField("_LastEditDate", StringType, nullable = true),
StructField("_LastActivityDate", StringType, nullable = false),
StructField("_Title", StringType, nullable = true),
StructField("_Tags", StringType, nullable = true),
StructField("_AnswerCount", IntegerType, nullable = true),
StructField("_CommentCount", IntegerType, nullable = true),
StructField("_FavoriteCount", IntegerType, nullable = true),
StructField("_ClosedDate", StringType, nullable = true),
StructField("_CommunityOwnedDate", StringType, nullable = true)
))
val userSchema = StructType(
Array(
StructField("_Id", LongType, nullable = false),
StructField("_Reputation", LongType, nullable = true),
StructField("_CreationDate", StringType, nullable = false),
StructField("_DisplayName", StringType, nullable = true),
StructField("_LastAccessDate", StringType, nullable = true),
StructField("_WebsiteUrl", StringType, nullable = true),
StructField("_Location", StringType, nullable = true),
StructField("_AboutMe", StringType, nullable = true),
StructField("_Views", LongType, nullable = true),
StructField("_UpVotes", LongType, nullable = true),
StructField("_Views", LongType, nullable = true),
StructField("_UpVotes", LongType, nullable = true),
StructField("_DownVotes", LongType, nullable = true),
StructField("_ProfileImageUrl", StringType, nullable = true),
StructField("_EmailHash", StringType, nullable = true),
StructField("_AccountId", LongType, nullable = true)
)
)
val posts = spark.read
.option("rowTag", "row")
.schema(postSchema)
.xml("hdfs://DR01/user/stackoverflow/Posts.xml")
.cache
val users = spark.read
.option("rowTag", "row")
.schema(userSchema)
.xml("hdfs://DR01/user/stackoverflow/Users.xml")
.cache
val postsAndUsers = posts.join(users, $"_OwnerUserId" === users("_Id"), "inner")
postsAndUsers.count