Мы написали пакетное приложение для искры (версия Spark: 2.3.0). Код выглядит следующим образом.
Преобразование: Dataset<CollectionFlattenedData> collectionDataDS = flatMap
(функция, которая анализирует некоторые файлы и возвращает нам набор данных);Этот набор данных будет иметь три типа данных, которые различаются по типу записи столбца: 1,2,3.
Загрузка в временную таблицу : collectionDataDS.createOrReplaceTempView(TEMP_TABLE);
Создание временного представления набора данных.
Action1 : sparkSession.sql("INSERT INTO TABLE1 SELECT COL1,COL2,COL3 FROM TEMP_TABLE WHERE recordtype='1'");
запрос улья для загрузки таблицы TABLE1 из временной таблицы.
Action2 : sparkSession.sql("INSERT INTO TABLE2 SELECT COL4,COL5,COL6 FROM TEMP_TABLE WHERE recordtype='2'");
запрос улья для загрузки TABLE2 из временной таблицытаблица.
Action3 : sparkSession.sql("INSERT INTO TABLE2 SELECT COL7,COL8,COL9 FROM TEMP_TABLE WHERE recordtype='3'");
запрос куста для загрузки таблицы ERROR
Что происходит: потому что мы выполняем 3 запроса, которые являются ничем иным, как отдельными действиями, преобразование плоской картывызывается три раза (один раз за одно действие). Но наше требование заключается в том, что мы должны вызывать операцию плоской карты только один раз.
Код pojo CollectionFlatteredData выглядит примерно так:
public class CollectionFlattenedData implements Serializable {
private String recordtype;
private String COL1;
private String COL2;
private String COL3;
private String COL4;
private String COL5;
private String COL6;
private String COL7;
private String COL8;
private String COL9;
//getters and setters of all the columns
}
Есть ли в любом случае, мы можем это сделать. Ранний ответ высоко ценится.