Pyspark: как происходит обмен данными между датодами во время следующих SQL-запросов? - PullRequest
0 голосов
/ 25 сентября 2019

Вот фрагмент моего кода на python с использованием pyspark и фрейма данных:

N=10 # number of machines of the cluster
notes = spark.read.option("header", True).csv(path+filename, schema=n_schema).persist()
notes=notes.sample(0.05)
notes=notes.repartition(N,'user1').persist()
df = notes.groupBy('user1').agg(collect_list('nF').alias('films1'))

df2=df.withColumnRenamed('user1','user2')
df2=df2.withColumnRenamed('films1','films2')
sqlContext.sql("SET spark.sql.shuffle.partitions = 4")
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = 0")
j=df.crossJoin(df2)
j=j.filter(j.user1 != j.user2)
from pyspark.sql.functions import udf
@udf("float")
def sim(films1, films2):
   s1 = set(films1)
   s2 = set(films2)
   return float(len(s1.intersection(s2)))/len(s1.union(s2))
Paire=j.select("user1","user2", sim("liste_films1","liste_films2").alias("sim"))
Paire.show(5)
print(Paire.dtypes)
[('user1', 'int'), ('user2', 'int'), ('sim', 'float')]

Предположим, у меня есть кластер из N = 10 машин, каждый из которых содержит свой раздел данных.Я хотел бы знать, что обмен данными происходит между этими машинами, особенно когда выполняются следующие инструкции (ленивое выполнение):

  • j = df.crossJoin (df)
  • j = j.filter (j.user1! = J.user2)
  • Paire = j.select ("user1", "user2", ...)

Я посмотрел вкладку DAG и SQL на странице http://localhost:4040/, но она мне не помогла.

Может ли кто-нибудь мне помочь?

...