Spark Dataframe: отсутствуют файлы при записи фрейма данных в CSV после разбиения по окну - PullRequest
0 голосов
/ 22 февраля 2020

У меня есть фрейм данных Spark, скажем, 10K ID. Каждая строка Dataframe состоит из пары ID и их евклидова расстояния (каждый ID представляет Документ. Структура данных выглядит следующим образом:

ID_source | ID_destination | Euclidean Distance
1           1                0.0
1           2                1.3777
1           3                1.38
.           .                .
.           .                .
.           .                .
2           1                0.5555
2           2                0.0
.           .                .
.           .                .
.           .                .

Для каждого ID_source я хочу иметь 10 лучших ID_destination согласно евклидову расстояние. Ну, в Spark мне удалось сделать это хорошо со следующими строками кода. Матрица, как описано выше, названа Similarity_join.

window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))

Проблема возникает, когда я хочу записать результат в CSV.

date_now =  datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now 
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)

У меня отсутствуют идентификаторы в окончательном CSV (Получено после сжатия выходных данных в имел oop). Когда я использую низкую выборку (10-500), у меня есть все идентификаторы, но при использовании Пример 5000 идентификаторов, у меня много пропущенных идентификаторов в CSV. Похоже, что некоторые разделы не записаны на диске. Даже когда я использую coalesce (1), у меня та же проблема. Любая помощь, пожалуйста. Я использую 5 машины (1 мастер, 4 рабочих). И я намерен go до 10 миллионов ID, поэтому у меня будет окно 10 миллионов (разделов)

1 Ответ

0 голосов
/ 25 февраля 2020

Наконец, проблема не была ни в разделении, ни в письменной части. Но вместо этого это было связано с алгоритмом (Bucketed Random L SH), который строил фрейм данных (Similarity_join); Этот алгоритм не был детерминирован c, поэтому число результатов было различным в зависимости от выбранных случайных параметров.

...