У меня есть несколько csvs с разными заголовками. некоторые из них имеют 12 столбцов, некоторые 14 столбцов и т. д .; Все они имеют первые 6 столбцов одинаково; Разница в столбцах основана на идентификаторе активности: если идентификатор активности равен 1, у нас будет определенный заголовок, если 2, у нас другой заголовок и т. Д. У меня более 70 типов действий;
Я должен объединить все это в одну большую таблицу avro, разделенную по Activityid;
Однако я не могу этого сделать.
Подход, который я принял:
1) Создайте большую таблицу avro, используя схему avro, которая содержит все отдельные столбцы во всех csvs. Первые 6 столбцов остаются без изменений;
2) Для каждого из csvs создайте фрейм данных, используя inferschema. Преобразуйте столбцы в нижний регистр и удалите все пробелы (чтобы сделать именование единообразным) и сохраните его как avro, используя activationid в качестве раздела
3) Моя идея состоит в том, что, поскольку каждый из файлов имеет столбцы, определенные в avsc, таблица должна автоматически выбрать его с пустыми значениями для других столбцов. Это потому, что avro поддерживает эволюцию схемы; Это не работает
4) Я пытался вставить в таблицу avro, используя имена столбцов - что-то вроде sql эквивалента вставки в (cola, colb, colc) выберите a, b, c из temp_view; Это также не работает; Как написать такое заявление в sparksql?
Я пробовал следующие вещи
Для (3)
df.write.mode ( "добавить") формат ( "com.databricks.spark.avro") вариант ( "заголовок", "ложь"), за исключением ( "hdfspath");... * * 1013
Для (4) sc.sparksql (s "вставить в large_avro_table (cola, colb, colc) Выбрать cola, colb, colc из temp_table");
Для (4) снова: df.select (colNames: _ *). Write.mode ("append"). InsertInto (large_avro_table)