Из того, что я вижу, rdd.toDF()
вводит PythonRDD
, что становится ExistingRDD
в плане запроса.
df1 = spark.range(100, numPartitions=5)
df2 = df1.rdd.toDF()
print(df1.rdd.toDebugString())
# (5) MapPartitionsRDD[2097] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2096] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2095] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2094] at javaToPython at <unknown>:0 []
# | ParallelCollectionRDD[2093] at javaToPython at <unknown>:0 []
print(df2.rdd.toDebugString())
# (5) MapPartitionsRDD[2132] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2131] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2130] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2129] at applySchemaToPythonRDD at <unknown>:0 []
# | MapPartitionsRDD[2128] at map at SerDeUtil.scala:137 []
# | MapPartitionsRDD[2127] at mapPartitions at SerDeUtil.scala:184 []
# | PythonRDD[2126] at RDD at PythonRDD.scala:53 []
# | MapPartitionsRDD[2097] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2096] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2095] at javaToPython at <unknown>:0 []
# | MapPartitionsRDD[2094] at javaToPython at <unknown>:0 []
# | ParallelCollectionRDD[2093] at javaToPython at <unknown>:0 []
Если я использую кэш DataFrame df1.cache()
, Spark SQL достаточно умен, чтобы использовать кэш в запросе для эквивалентного СДР.
spark.range(100, numPartitions=5).groupby().count().explain()
# == Physical Plan ==
# *(2) HashAggregate(keys=[], functions=[count(1)])
# +- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
# +- *(1) InMemoryTableScan
# +- InMemoryRelation [id#2525L], StorageLevel(disk, memory, deserialized, 1 replicas)
# +- *(1) Range (0, 100, step=1, splits=5)
Однако, ExistingRDD
от этого не выигрывает.
df2.groupby().count().explain()
# == Physical Plan ==
# *(2) HashAggregate(keys=[], functions=[count(1)])
# +- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
# +- *(1) Project
# +- Scan ExistingRDD[id#2573L]
Кажется, оптимизатор Spark SQL не отслеживает ExistingRDD
. Это правда?
Будет ли по-прежнему полезен кеш RDD, если я использую df1.rdd.cache().count()
, поскольку df2.rdd
является потомком df1.rdd
?
Я также хотел бы знать, какие операции будут формироваться ExistingRDD
, если это создаст барьер для плана запроса и, следовательно, ухудшит производительность.