Любопытное.Он опирается на то, что у Spark достаточно информации, чтобы вывести зависимость.
Например, я повторил ваш пример, как описано:
from pyspark.sql.functions import hash
def f(spark, filename):
df=spark.read.csv(filename)
df2=df.select(hash('_c1').alias('hashc2'))
df3=df2.select(hash('hashc2').alias('hashc3'))
df4=df3.select(hash('hashc3').alias('hashc4'))
return df4
filename = 'some-valid-file.csv'
df_a = f(spark, filename)
df_b = f(spark, filename)
assert df_a != df_b
df_joined = df_a.join(df_b, df_a.hashc4==df_b.hashc4, how='left')
Если я объясню этот результирующий кадр данных, используя df_joined.explain(extended=True)
, я увижуследующие четыре плана:
== Parsed Logical Plan ==
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hashc3#18, 42) AS hashc4#20]
: +- Project [hash(hashc2#16, 42) AS hashc3#18]
: +- Project [hash(_c1#11, 42) AS hashc2#16]
: +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hashc3#40, 42) AS hashc4#42]
+- Project [hash(hashc2#38, 42) AS hashc3#40]
+- Project [hash(_c1#33, 42) AS hashc2#38]
+- Relation[_c0#32,_c1#33,_c2#34] csv
== Analyzed Logical Plan ==
hashc4: int, hashc4: int
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hashc3#18, 42) AS hashc4#20]
: +- Project [hash(hashc2#16, 42) AS hashc3#18]
: +- Project [hash(_c1#11, 42) AS hashc2#16]
: +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hashc3#40, 42) AS hashc4#42]
+- Project [hash(hashc2#38, 42) AS hashc3#40]
+- Project [hash(_c1#33, 42) AS hashc2#38]
+- Relation[_c0#32,_c1#33,_c2#34] csv
== Optimized Logical Plan ==
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hash(hash(_c1#11, 42), 42), 42) AS hashc4#20]
: +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hash(hash(_c1#33, 42), 42), 42) AS hashc4#42]
+- Relation[_c0#32,_c1#33,_c2#34] csv
== Physical Plan ==
SortMergeJoin [hashc4#20], [hashc4#42], LeftOuter
:- *(2) Sort [hashc4#20 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(hashc4#20, 200)
: +- *(1) Project [hash(hash(hash(_c1#11, 42), 42), 42) AS hashc4#20]
: +- *(1) FileScan csv [_c1#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file: some-valid-file.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c1:string>
+- *(4) Sort [hashc4#42 ASC NULLS FIRST], false, 0
+- ReusedExchange [hashc4#42], Exchange hashpartitioning(hashc4#20, 200)
Физический план выше считывает CSV только один раз и повторно использует все вычисления, поскольку Spark обнаруживает, чтодва FileScan
идентичны (т. е. Spark знает, что они не являются независимыми).
Теперь рассмотрим, заменю ли я read.csv
независимыми, но идентичными СДР ручной работы.
from pyspark.sql.functions import hash
def g(spark):
df=spark.createDataFrame([('a', 'a'), ('b', 'b'), ('c', 'c')], ["_c1", "_c2"])
df2=df.select(hash('_c1').alias('hashc2'))
df3=df2.select(hash('hashc2').alias('hashc3'))
df4=df3.select(hash('hashc3').alias('hashc4'))
return df4
df_c = g(spark)
df_d = g(spark)
df_joined = df_c.join(df_d, df_c.hashc4==df_d.hashc4, how='left')
В этом случае физический план Спарка сканирует два разных RDD.Вот результат выполнения df_joined.explain(extended=True)
для подтверждения.
== Parsed Logical Plan ==
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hashc3#6, 42) AS hashc4#8]
: +- Project [hash(hashc2#4, 42) AS hashc3#6]
: +- Project [hash(_c1#0, 42) AS hashc2#4]
: +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hashc3#16, 42) AS hashc4#18]
+- Project [hash(hashc2#14, 42) AS hashc3#16]
+- Project [hash(_c1#10, 42) AS hashc2#14]
+- LogicalRDD [_c1#10, _c2#11], false
== Analyzed Logical Plan ==
hashc4: int, hashc4: int
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hashc3#6, 42) AS hashc4#8]
: +- Project [hash(hashc2#4, 42) AS hashc3#6]
: +- Project [hash(_c1#0, 42) AS hashc2#4]
: +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hashc3#16, 42) AS hashc4#18]
+- Project [hash(hashc2#14, 42) AS hashc3#16]
+- Project [hash(_c1#10, 42) AS hashc2#14]
+- LogicalRDD [_c1#10, _c2#11], false
== Optimized Logical Plan ==
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hash(hash(_c1#0, 42), 42), 42) AS hashc4#8]
: +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hash(hash(_c1#10, 42), 42), 42) AS hashc4#18]
+- LogicalRDD [_c1#10, _c2#11], false
== Physical Plan ==
SortMergeJoin [hashc4#8], [hashc4#18], LeftOuter
:- *(2) Sort [hashc4#8 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(hashc4#8, 200)
: +- *(1) Project [hash(hash(hash(_c1#0, 42), 42), 42) AS hashc4#8]
: +- Scan ExistingRDD[_c1#0,_c2#1]
+- *(4) Sort [hashc4#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(hashc4#18, 200)
+- *(3) Project [hash(hash(hash(_c1#10, 42), 42), 42) AS hashc4#18]
+- Scan ExistingRDD[_c1#10,_c2#11]
Это не совсем поведение PySpark.