У меня есть фрейм данных Spark, скажем, 10K ID. Каждая строка Dataframe состоит из пары ID и их евклидова расстояния (каждый ID представляет Документ. Структура данных выглядит следующим образом:
ID_source | ID_destination | Euclidean Distance
1 1 0.0
1 2 1.3777
1 3 1.38
. . .
. . .
. . .
2 1 0.5555
2 2 0.0
. . .
. . .
. . .
Для каждого ID_source я хочу иметь 10 лучших ID_destination согласно евклидову расстояние. Ну, в Spark мне удалось сделать это хорошо со следующими строками кода. Матрица, как описано выше, названа Similarity_join.
window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))
Проблема возникает, когда я хочу записать результат в CSV.
date_now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)
У меня отсутствуют идентификаторы в окончательном CSV (Получено после сжатия выходных данных в имел oop). Когда я использую низкую выборку (10-500), у меня есть все идентификаторы, но при использовании Пример 5000 идентификаторов, у меня много пропущенных идентификаторов в CSV. Похоже, что некоторые разделы не записаны на диске. Даже когда я использую coalesce (1), у меня та же проблема. Любая помощь, пожалуйста. Я использую 5 машины (1 мастер, 4 рабочих). И я намерен go до 10 миллионов ID, поэтому у меня будет окно 10 миллионов (разделов)