Вот фрагмент моего кода на python с использованием pyspark и фрейма данных:
N=10 # number of machines of the cluster
notes = spark.read.option("header", True).csv(path+filename, schema=n_schema).persist()
notes=notes.sample(0.05)
notes=notes.repartition(N,'user1').persist()
df = notes.groupBy('user1').agg(collect_list('nF').alias('films1'))
df2=df.withColumnRenamed('user1','user2')
df2=df2.withColumnRenamed('films1','films2')
sqlContext.sql("SET spark.sql.shuffle.partitions = 4")
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = 0")
j=df.crossJoin(df2)
j=j.filter(j.user1 != j.user2)
from pyspark.sql.functions import udf
@udf("float")
def sim(films1, films2):
s1 = set(films1)
s2 = set(films2)
return float(len(s1.intersection(s2)))/len(s1.union(s2))
Paire=j.select("user1","user2", sim("liste_films1","liste_films2").alias("sim"))
Paire.show(5)
print(Paire.dtypes)
[('user1', 'int'), ('user2', 'int'), ('sim', 'float')]
Предположим, у меня есть кластер из N = 10 машин, каждый из которых содержит свой раздел данных.Я хотел бы знать, что обмен данными происходит между этими машинами, особенно когда выполняются следующие инструкции (ленивое выполнение):
- j = df.crossJoin (df)
- j = j.filter (j.user1! = J.user2)
- Paire = j.select ("user1", "user2", ...)
Я посмотрел вкладку DAG и SQL на странице http://localhost:4040/, но она мне не помогла.
Может ли кто-нибудь мне помочь?