PySpark: непоследовательный результат count () после объединения - PullRequest
0 голосов
/ 04 декабря 2018

Я полностью сбит с толку следующей проблемой:

Когда я соединяю 2 фрейма данных и возвращаю количество строк, я получаю немного разные счета при каждой попытке.Вот подробности:

Я хотел бы объединить кадры данных: 'df_user_ids' и 'df_conversions':

df_user_ids.show()
>>>
+--------------------+
|             user_id|
+--------------------+
|AMsySZY-cqcufnXst...|
|AMsySZY1Oo75A6vKU...|
|AMsySZY4nbqZiuEMR...|
|AMsySZY5RSfgj6Xvi...|
|AMsySZY5geAmTx0er...|
|AMsySZY6Gskv_kEAv...|
|AMsySZY6MIOyPWM4U...|
|AMsySZYCEZYS00UB9...| 

df_conversions.show()
>>>
+--------------------+----------------------+---------+
|             user_id|time_activity_observed|converted|
+--------------------+----------------------+---------+
|CAESEAl1YPOZpaWVx...|   2018-03-23 12:15:37|        1|
|CAESEAuvSBzmfc_f3...|   2018-03-23 21:58:25|        1|
|CAESEBXWsSYm4ntvR...|   2018-03-30 12:16:53|        1|
|CAESEC-5uPwWGFdnv...|   2018-03-23 08:52:48|        1|
|CAESEDB3Z-NNvz7zL...|   2018-03-24 21:37:05|        1|
|CAESEDu7S7rGTVlj2...|   2018-04-01 17:00:12|        1|
|CAESEE4s6g1-JlUEt...|   2018-03-23 19:32:23|        1|
|CAESEELlJt0mE2xjn...|   2018-03-24 18:26:15|        1|

Оба кадра данных имеют ключевой столбецnamed: "user_id", и оба создаются с использованием ".sampleBy ()" с фиксированным начальным числом:

.sampleBy("converted", fractions={0: 0.035, 1: 1}, seed=0)    

Перед тем, как присоединить кадры данных, я сохраняю их на диск:

df_user_ids.persist(StorageLevel.DISK_ONLY)
df_conversions.persist(StorageLevel.DISK_ONLY) 

Затем я проверяю, что число строк обоих фреймов данных согласовано:

df_user_ids.count()
>>> 584309

df_user_ids.count()
>>> 584309

df_conversions.count()
>>> 5830

df_conversions.count()
>>> 5830

И проверяю, что ключевой столбец обоих фреймов данных не содержит дубликатов:

df_user_ids.count()
>>> 584309

df_user_ids.select('user_id').distinct().count()
>>> 584309

df_conversions.count()
>>> 5830

df_conversions.select('user_id').distinct().count()
>>> 5830

ЗатемКогда я присоединяюсь к ним, я получаю непоследовательное количество строк!

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584314

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584317

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584304

Как это возможно ??

Иногда это число соединений больше, чем "df_user_ids.count ()", а иногда меньше.Я использую ноутбук Zeppelin в AWS EMR на кластере EMR для запуска этого кода.

Я уже попробовал то, что предлагается по ссылке ниже:

  • ". Persist (StorageLevel.DISK_ONLY) "не помогает.
  • Я не использую monotonically_increasing_id.

несогласованность искры при выполнении команды отсчета

1 Ответ

0 голосов
/ 05 декабря 2018

Рассматривая серию операций, которые вы выполняете на DataFrames, я думаю, что проблема связана с Join.Результаты операции соединения случайны, когда каждый узел общается с каждым другим узлом, и они обмениваются данными в соответствии с тем, какой узел имеет определенный ключ или набор ключей (к которым вы присоединяетесь).При совместном использовании данных между исполнителями, если у исполнителя нет сохраненного на диске фрейма данных, он будет повторно вычислять группу обеспечения доступности баз данных, и не гарантируется, что sampleBy возвращает ту же часть строк в фрейме данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...