У меня есть некоторые опасения относительно поведения фреймов данных после их записи в таблицы Hive.
Контекст : Я запускаю Spark Scala (версия 2.2.0.2.6.4. 105-1) через spark-submit
в моей производственной среде, которая имеет Had oop 2. Я выполняю несколько вычислений и сохраняю некоторые промежуточные данные в Hive OR C таблицах; после сохранения таблицы мне нужно повторно использовать фрейм данных для вычисления нового фрейма данных, который будет сохранен в другой таблице Hive OR C.
Например:
// dataframe with ~10 million record
val df = prev_df.filter(some_filters)
val df_temp_table_name = "temp_table"
val df_table_name = "table"
sql("SET hive.exec.dynamic.partition = true")
sql("SET hive.exec.dynamic.partition.mode = nonstrict")
df.createOrReplaceTempView(df_temp_table_name)
sql(s"""INSERT OVERWRITE TABLE $df_table_name PARTITION(partition_timestamp)
SELECT * FROM $df_temp_table_name """)
Эти шаги всегда работа, и таблица будет правильно заполнена правильными данными и разделами.
После этого мне нужно использовать только что вычисленный фрейм данных (df
) для обновления другой таблицы. Поэтому я запрашиваю обновление таблицы в фрейм данных df2
, затем присоединяю df
к df2
, и результат соединения должен перезаписать таблицу df2
(простая, несекционированная таблица).
val table_name_to_be_updated = "table2"
// Query the table to be updated
val df2 = sql(table_name_to_be_updated)
val df3 = df.join(df2).filter(some_filters).withColumn(something)
val temp = "temp_table2"
df3.createOrReplaceTempView(temp)
sql(s"""INSERT OVERWRITE TABLE $table_name_to_be_updated
SELECT * FROM $temp """)
На этом этапе df3
всегда считается пустым, поэтому результирующая таблица Hive также всегда пуста. Это происходит также, когда я .persist()
сохраняю его в памяти.
При тестировании с spark-shell
я никогда не сталкивался с проблемой. Это происходит только тогда, когда поток запланирован в cluster-mode
под Ooz ie.
Как вы думаете, в чем может быть проблема? Есть ли у вас какие-либо советы по решению такой проблемы с эффективным использованием памяти?
Я не понимаю, становится ли это первым df
пустым после записи в таблицу, или проблема в том, что я сначала запрашиваю, а затем пытаюсь перезаписать ту же таблицу.
Заранее большое спасибо и удачного дня!
Изменить:
Раньше df
вычислялся в отдельном скрипте, а затем вставлялся в соответствующий стол. Во втором сценарии эта таблица была запрошена в новую переменную df
; затем table_to_be_updated также был запрошен и сохранен, скажем, в переменной old_df2
. Затем они были объединены и вычислены в новой переменной df3
, которая затем была вставлена с перезаписью в table_to_be_updated.