Хорошо, я разработал некоторый код для проверки того, что происходит, если объект, указанный в RDD, видоизменяется картографом, и я рад сообщить, что это невозможно, если вы программируете из Python.
Вот моя тестовая программа:
from pyspark.sql import SparkSession
import time
COUNT = 5
def funnydir(i):
"""Return a directory for i"""
return {"i":i,
"gen":0 }
def funnymap(d):
"""Take a directory and perform a funnymap"""
d['gen'] = d.get('gen',0) + 1
d['id' ] = id(d)
return d
if __name__=="__main__":
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
dfroot = sc.parallelize(range(COUNT)).map(funnydir)
dfroot.persist()
df1 = dfroot.map(funnymap)
df2 = df1.map(funnymap)
df3 = df2.map(funnymap)
df4 = df3.map(funnymap)
print("===========================================")
print("*** df1:",df1.collect())
print("*** df2:",df2.collect())
print("*** df3:",df3.collect())
print("*** df4:",df4.collect())
print("===========================================")
ef1 = dfroot.map(funnymap)
ef2 = ef1.map(funnymap)
ef3 = ef2.map(funnymap)
ef4 = ef3.map(funnymap)
print("*** ef1:",ef1.collect())
print("*** ef2:",ef2.collect())
print("*** ef3:",ef3.collect())
print("*** ef4:",ef4.collect())
Если вы запустите это, вы увидите, что идентификатор для словаря d
отличается в каждом из фреймов данных. Очевидно, Spark сериализует десериализацию объектов, когда они передаются от маппера к мапперу. Таким образом, каждый получает свою версию.
Если это не так, то первый вызов funnymap
для создания df1 также изменит генерацию во фрейме данных dfroot
, и в результате ef4 будет иметь разные номера генерации, чем df4.