Что такое «ExistingRDD» и плохо ли это для плана запроса? - PullRequest
0 голосов
/ 24 апреля 2019

Из того, что я вижу, rdd.toDF() вводит PythonRDD, что становится ExistingRDD в плане запроса.

df1 = spark.range(100, numPartitions=5)
df2 = df1.rdd.toDF()

print(df1.rdd.toDebugString())
# (5) MapPartitionsRDD[2097] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2096] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2095] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2094] at javaToPython at <unknown>:0 []
#  |  ParallelCollectionRDD[2093] at javaToPython at <unknown>:0 []
print(df2.rdd.toDebugString())
# (5) MapPartitionsRDD[2132] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2131] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2130] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2129] at applySchemaToPythonRDD at <unknown>:0 []
#  |  MapPartitionsRDD[2128] at map at SerDeUtil.scala:137 []
#  |  MapPartitionsRDD[2127] at mapPartitions at SerDeUtil.scala:184 []
#  |  PythonRDD[2126] at RDD at PythonRDD.scala:53 []
#  |  MapPartitionsRDD[2097] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2096] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2095] at javaToPython at <unknown>:0 []
#  |  MapPartitionsRDD[2094] at javaToPython at <unknown>:0 []
#  |  ParallelCollectionRDD[2093] at javaToPython at <unknown>:0 []

Если я использую кэш DataFrame df1.cache(), Spark SQL достаточно умен, чтобы использовать кэш в запросе для эквивалентного СДР.

spark.range(100, numPartitions=5).groupby().count().explain()
# == Physical Plan ==
# *(2) HashAggregate(keys=[], functions=[count(1)])
# +- Exchange SinglePartition
#    +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
#       +- *(1) InMemoryTableScan
#             +- InMemoryRelation [id#2525L], StorageLevel(disk, memory, deserialized, 1 replicas)
#                   +- *(1) Range (0, 100, step=1, splits=5)

Однако, ExistingRDD от этого не выигрывает.

df2.groupby().count().explain()
# == Physical Plan ==
# *(2) HashAggregate(keys=[], functions=[count(1)])
# +- Exchange SinglePartition
#    +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
#       +- *(1) Project
#          +- Scan ExistingRDD[id#2573L]

Кажется, оптимизатор Spark SQL не отслеживает ExistingRDD. Это правда?

Будет ли по-прежнему полезен кеш RDD, если я использую df1.rdd.cache().count(), поскольку df2.rdd является потомком df1.rdd?

Я также хотел бы знать, какие операции будут формироваться ExistingRDD, если это создаст барьер для плана запроса и, следовательно, ухудшит производительность.

...