объединить несколько CSV в одну большую таблицу AVRO - PullRequest
0 голосов
/ 08 января 2019

У меня есть несколько csvs с разными заголовками. некоторые из них имеют 12 столбцов, некоторые 14 столбцов и т. д .; Все они имеют первые 6 столбцов одинаково; Разница в столбцах основана на идентификаторе активности: если идентификатор активности равен 1, у нас будет определенный заголовок, если 2, у нас другой заголовок и т. Д. У меня более 70 типов действий;

Я должен объединить все это в одну большую таблицу avro, разделенную по Activityid; Однако я не могу этого сделать.

Подход, который я принял: 1) Создайте большую таблицу avro, используя схему avro, которая содержит все отдельные столбцы во всех csvs. Первые 6 столбцов остаются без изменений;

2) Для каждого из csvs создайте фрейм данных, используя inferschema. Преобразуйте столбцы в нижний регистр и удалите все пробелы (чтобы сделать именование единообразным) и сохраните его как avro, используя activationid в качестве раздела

3) Моя идея состоит в том, что, поскольку каждый из файлов имеет столбцы, определенные в avsc, таблица должна автоматически выбрать его с пустыми значениями для других столбцов. Это потому, что avro поддерживает эволюцию схемы; Это не работает

4) Я пытался вставить в таблицу avro, используя имена столбцов - что-то вроде sql эквивалента вставки в (cola, colb, colc) выберите a, b, c из temp_view; Это также не работает; Как написать такое заявление в sparksql?

Я пробовал следующие вещи Для (3) df.write.mode ( "добавить") формат ( "com.databricks.spark.avro") вариант ( "заголовок", "ложь"), за исключением ( "hdfspath");... * * 1013

Для (4) sc.sparksql (s "вставить в large_avro_table (cola, colb, colc) Выбрать cola, colb, colc из temp_table");

Для (4) снова: df.select (colNames: _ *). Write.mode ("append"). InsertInto (large_avro_table)

...