Сохранение фреймов данных с очень большими значениями в Spark - PullRequest
0 голосов
/ 15 мая 2018

Используя фрейм данных Spark, я выполняю операцию groupBy, чтобы собрать все значения, связанные с ключом, в список.Размер собранных значений может сильно различаться.На самом деле я пытаюсь сгенерировать «документы» путем объединения значений составного ключа для постобработки.

Для иллюстрации: df - это кадр данных с 3 строковыми столбцами A, B, C.

df.groupBy(concat($"A", lit("-"), $"B").alias("Key")).agg(collect_list($"C").alias("values"))

Выполнение этого запроса для извлечения пары строк работает, что означает, что команда верна.

Однако, когда я пытаюсь сохранить полный вывод в виде сжатого CSV, илиПаркет, этот процесс завершается неудачей по нескольким причинам, в том числе из-за проблем с памятью (которые я пытался настроить) и криптосерилизации.

Я подозреваю, что проблема заключается в том, что некоторые значения чрезвычайно велики.Есть ли лучшая практика для таких случаев?

1 Ответ

0 голосов
/ 15 мая 2018

Хотя точно знать, какую ошибку вы получите, ваша проблема весьма вероятна из-за того, что слишком много данных попадает в одну строку.Чтобы преодолеть это, вы можете добавить искусственное разбиение со случайным столбцом.Таким образом, ваши сгруппированные данные будут разделены между несколькими строками, а затем и несколькими файлами, предотвращая возникновение ошибок OOM.Вот как вы могли бы это сделать:

val df = sc.parallelize(Seq((1, 2, 3), (1, 2, 4), (1, 2, 5), (1, 3, 2)))
    .toDF("A", "B", "C")

То, что вы пытаетесь сделать, это, и C может стать слишком большим.

df.groupBy("A", "B")
  .agg(collect_list('C))
  .show
+---+---+---------------+                                                       
|  A|  B|collect_list(C)|
+---+---+---------------+
|  1|  2|      [3, 4, 5]|
|  1|  3|            [2]|
+---+---+---------------+

Вместо этого вы можете добавить случайный столбецR, чтобы предотвратить получение C слишком большим.Данные будут разбиты по нескольким значениям R, уменьшая возможный объем данных, которые можно поместить в один ряд.Здесь я использовал 3 возможных случайных значения для примера, но в вашем случае должно быть необходимо большее значение.

val new_df = df.withColumn("R", floor(rand()*3)).groupBy("A", "B", "R").agg(collect_list('C) as "C").show
new_df.show
+---+---+---+------+
|  A|  B|  R|     C|
+---+---+---+------+
|  1|  2|  1|   [5]|
|  1|  2|  2|[3, 4]|
|  1|  3|  1|   [2]|
+---+---+---+------+

Затем вы можете записать свой секционированный кадр данных следующим образом.

new_df.write.partitionBy("A", "B", "R").parquet("...")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...