Как сделать добавление вставки в sparksql? - PullRequest
0 голосов
/ 28 мая 2019

У меня есть конечная точка API, написанная sparksql со следующим примером кода. Каждый раз, когда API принимает запрос, он запускает sparkSession.sql (sql_to_hive), который создает отдельный файл в HDFS. Есть ли способ сделать вставку, добавив данные в существующий файл в HDFS? Спасибо.

    sqlContext = SQLContext(sparkSession.sparkContext)
    df = sqlContext.createDataFrame(ziped_tuple_list, schema=schema)
    df.registerTempTable('TMP_TABLE')
    sql_to_hive = 'insert into log.%(table_name)s partition%(partition)s select %(title_str)s from TMP_TABLE'%{
        'table_name': table_name,
        'partition': partition_day,
        'title_str': title_str
    }
    sparkSession.sql(sql_to_hive)

Ответы [ 2 ]

0 голосов
/ 28 мая 2019

Когда вы пишете полученный фрейм данных:

result_df = sparkSession.sql (sql_to_hive)

установите его режим для добавления:

result_df.write.mode ( SaveMode.Append ).

0 голосов
/ 28 мая 2019

Я не думаю, что это возможно, чтобы добавить данные в существующий файл.

Но вы можете обойти этот случай, используя любой из этих способов

Approach1

Using Spark, write to intermediate temporary table and then insert overwrite to final table:

existing_df=spark.table("existing_hive_table") //get the current data from hive
current_df //new dataframe
union_df=existing_df.union(current_df) 
union_df.write.mode("overwrite").saveAsTable("temp_table") //write the data to temp table
temp_df=spark.table("temp_table") //get data from temp table
temp_df.repartition(<number>).write.mode("overwrite").saveAsTable("existing_hive_table") //overwrite to final table

Approach2:

Hive(not spark) предлагает перезапись и выбор той же таблицы .i.e

insert overwrite table default.t1 partition(partiton_column) 
select * from default.t1; //overwrite and select from same t1 table

Если вы идете по этому пути, то после завершения искрового задания должна быть запущена работа улья.

Hive получит блокировку при выполнении перезаписи / выбора той же таблицы, поэтому, если какое-либо задание, записывающее в таблицу, будет ожидать.

In Addition: Orc format предложит изменить таблицу сцепления , которая объединит небольшие файлы ORC для создания нового файла большего размера.

 alter table <db_name>.<orc_table_name> [partition_column="val"] concatenate;

Мы также можем использовать distributeby,sortby clauses для управления количеством файлов, для получения более подробной информации см. эту и эту ссылку.

Другой Approach3 с помощью hadoop fs -getMerge объединяет все небольшие файлы в один (этот метод works для text files и i haven't tried для orc, avro .. и т. д.).

...