Spark: как собрать большой объем данных без нехватки памяти - PullRequest
0 голосов
/ 08 мая 2018

У меня есть следующая проблема:

Я делаю sql-запрос по набору файлов паркета в HDFS, а затем собираю, чтобы получить результат.

Проблема в том, что при большом количестве строк возникает ошибка нехватки памяти.

Этот запрос требует тасования, поэтому я не могу выполнить запрос для каждого файла.

Одним из решений может быть перебрать значения столбца и сохранить результат на диске:

df = sql('original query goes here')
// data = collect(df) <- out of memory
createOrReplaceTempView(df, 't')
for each c in cities
    x = collect(sql("select * from t where city = c")
    append x to file

Насколько я знаю, это приведет к тому, что программа займет слишком много времени, поскольку запрос будет выполняться для каждого города.

Каков наилучший способ сделать это?

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Как сказал @cricket_007, я бы не стал collect() передавать ваши данные из Spark в файл в R. Кроме того, не имеет смысла перебирать список SparkR::distinct() городов, а затем выбирать все из этих таблиц, просто чтобы добавить их в некоторый выходной набор данных. Единственный раз, когда вы захотите это сделать, - это если вы пытаетесь выполнить другую операцию в каждой группе на основе некоторой условной логики или применить операцию к каждой группе, используя функцию, которая НЕ доступна в SparkR.

Я думаю, что вы пытаетесь получить фрейм данных (Spark или R) с наблюдениями, сгруппированными таким образом, чтобы при взгляде на них все было красиво. Для этого добавьте предложение GROUP BY city в ваш первый запрос SQL. Оттуда просто запишите данные обратно в HDFS или другой выходной каталог. Из того, что я понимаю по вашему вопросу, возможно, что-то подобное поможет:

sdf <- SparkR::sql('SELECT SOME GREAT QUERY FROM TABLE GROUP BY city')

SparkR::write.parquet(sdf, path="path/to/desired/output/location", mode="append")

Это даст вам все ваши данные в одном файле, и они должны быть сгруппированы по city, что, как я думаю, вы пытаетесь получить с помощью второго запроса в вашем вопросе.

Вы можете подтвердить, что вы хотите получить через:

newsdf<- SparkR::read.parquet(x="path/to/first/output/location/")
View(head(sdf, num=200))

Удачи, надеюсь, это поможет.

0 голосов
/ 08 мая 2018

В случае нехватки памяти, что означает, что выходные данные действительно очень большие, поэтому Вы можете записать результаты в какой-то сам файл, например, в файл паркета.

Если вы хотите в дальнейшем выполнить какую-либо операцию с этими собранными данными, вы можете прочитать данные из этого файла.

Для больших наборов данных мы не должны использовать collect(), вместо этого вы можете использовать take(100) или take(some_integer) для проверки правильности некоторых значений.

...