В настоящее время я построил модель BucketedRandomProjectionLSH
, чтобы вычислить сходство данных по approxNearestNeighbors
. Следующий код выглядит следующим образом:
df = sql_context.read.format("org.apache.spark.sql.cassandra").options(table="data", keyspace="spark").load()
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(df)
df_collected = df.collect()
for x in df_collected:
result = model.approxNearestNeighbors(df, x["features"], 30).collect()
write(result) ## save result to db
Я думаю, есть ли какой-нибудь метод, который может распределить рабочую нагрузку цикла for
for x in df_collected:
result = model.approxNearestNeighbors(df, x["features"], 30).collect()
write(result) ## save result to db
на каждую машину
Я обнаружил, что foreach
или foreachPartition
могут сделать работу, но я пытался. Они не позволяют мне пропустить rdd df
.
Есть ли лучшая практика по решению подобных проблем?