Должен ли я сохранять фрейм данных Spark, если я продолжаю добавлять в него столбцы? - PullRequest
0 голосов
/ 20 мая 2018

Я не смог найти ни одной дискуссии по теме ниже ни на одном форуме, который искал в интернете.Это может быть потому, что я новичок в Spark и Scala и не задаю действительный вопрос.Если есть какие-либо темы, обсуждающие ту же или аналогичную тему, ссылки будут очень полезны.:)

Я работаю над процессом, который использует Spark и Scala и создает файл, читая множество таблиц и получая множество полей, применяя логику к данным, извлеченным из таблиц.Итак, структура моего кода выглядит следующим образом:

val driver_sql = "SELECT ...";

var df_res = spark.sql(driver_sql)

var df_res = df_res.withColumn("Col1", <logic>)

var df_res = df_res.withColumn("Col2", <logic>)

var df_res = df_res.withColumn("Col3", <logic>)
.
.
.

var df_res = df_res.withColumn("Col20", <logic>)

По сути, существует запрос драйвера, который создает фрейм данных "driver".После этого выполняется отдельная логика (функции) на основе ключа или ключей в кадре данных драйвера для добавления новых столбцов / полей.«Логическая» часть - это не всегда однострочный код, иногда это отдельная функция, которая выполняет другой запрос, выполняет какое-то соединение с df_res и добавляет новый столбец. Количество записей также изменяется, поскольку в некоторых случаях я использую «внутреннее» объединение с другими таблицами / фреймами данных.

Итак, вот мои вопросы:

  • Должен ли ясохраняться df_res в любой момент времени?
  • Можно ли снова и снова сохранять df_res после добавления столбцов?Я имею в виду, добавляет ли это значение?
  • Если я сохраняю df_res (только диск) каждый раз, когда добавляется новый столбец, заменяются ли данные на диске?Или это создает новую копию / версию df_res на диске?
  • Есть ли лучшая техника для сохранения / кэширования данных в подобном сценарии (чтобы не делать много вещей в памяти)?

1 Ответ

0 голосов
/ 26 июня 2018

Первое, что нужно сделать, это сохранение данных, когда вы собираетесь применять итеративные операции к данным.
То, что вы делаете здесь, - это применение операции преобразования к вашим данным.Здесь нет необходимости сохранять эти кадры данных.
Например: - Сохранение будет полезно, если вы делаете что-то подобное.

val df = spark.sql("select * from ...").persist

df.count

val df1 = df.select("..").withColumn("xyz",udf(..))

df1.count

val df2 = df.select("..").withColumn("abc",udf2(..))

df2.count

Теперь, если вы сохраните df здесь, это будет полезно при расчете df1 и df2.Еще одна вещь, на которую следует обратить внимание: причина, по которой я сделал df.count, заключается в том, что датафрейм сохраняется только при применении к нему действия.Из документов Spark: «При первом вычислении в действии он будет храниться в памяти узлов».И это также отвечает на ваш второй вопрос.

Каждый раз, когда вы настаиваете, будет создаваться новая копия, но вы должны сначала отменить действие предыдущей.

...