Как обрабатывать вставки из исходной искры df в таблицу кустов, если количество столбцов отличается - PullRequest
0 голосов
/ 27 апреля 2018

Я пытаюсь записать фрейм данных pyspark в таблицу кустов, но так как у моего исходного df есть только 5 столбцов, а у цели 9 столбцов, это приводит к ошибке.

Кроме того, поскольку существует множество случаев, я не хочу ставить запросы на вставку вручную, которые могут решить эту проблему. Я ищу лучший автоматизированный способ решения этой проблемы без написания запросов для каждого случая вручную.

Я думал о создании нового df в искре, взяв исходный df и дополнительные столбцы, которые присутствуют в целевой таблице, но не в исходном df, но он работает не так, как я думал.

Вот код, над которым я работаю

#extract cols from src df and tgt df(hive table) 
src_cols = df1.columns
tgt_cols = df2.columns

#get the extra cols (diff)
extra_cols = list(set(tgt_cols) - set(src_cols))
#extra_cols = ['state', 'datetime', 'zipcode', 'type']

#formulate the string to add extra cols
string = ""
for item in extra_cols:
    string += str(".withColumn(\""+item+"\", lit(\"NULL\"))")

Это выведет требуемую строку, которую я могу использовать для нового df

#'.withColumn("state", lit(NULL)).withColumn("datetime", lit(NULL)).withColumn("zipcode", lit(NULL)).withColumn("type", lit(NULL))'


new_df = "df1" + string
#'df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))'

Проблема в том, что я не могу выполнить код df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL")), так как это строка

Может кто-нибудь помочь мне разобраться с этим сценарием гораздо лучше.

Спасибо.

1 Ответ

0 голосов
/ 27 апреля 2018

Если вы определили список различий в именах столбцов как

#extra_cols = ['state', 'datetime', 'zipcode', 'type']

Тогда вам не нужно формулировать строку для добавления дополнительных столбцов , вы можете просто использовать функцию reduce, чтобы применить .withColumn к списку имен столбцов как

import pyspark.sql.functions as f
to_be_written_df = reduce(lambda temp_df, col_name: temp_df.withColumn(col_name, f.lit('NULL')), extra_cols, df1)

Это должно решить вашу проблему

...