У меня есть таблица, подобная следующей Qubole:
use dm;
CREATE EXTERNAL TABLE IF NOT EXISTS fact (
id string,
fact_attr struct<
attr1 : String,
attr2 : String
>
)
STORED AS PARQUET
LOCATION 's3://my-bucket/DM/fact'
Я создал параллельную таблицу в Snowflake, как показано ниже:
CREATE TABLE IF NOT EXISTS dm.fact (
id string,
fact_attr variant
)
Мой процесс ETL загружает данные в таблицу Qubole, например:
+------------+--------------------------------+
| id | fact_attr |
+------------+--------------------------------+
| 1 | {"attr1": "a1", "attr2": "a2"} |
| 2 | {"attr1": "a3", "attr2": null} |
+------------+--------------------------------+
Я пытаюсь синхронизировать эти данные со снежинкой с помощью команды Merge, например
MERGE INTO DM.FACT dst USING %s src
ON dst.id = src.id
WHEN MATCHED THEN UPDATE SET
fact_attr = parse_json(src.fact_attr)
WHEN NOT MATCHED THEN INSERT (
id,
fact_attr
) VALUES (
src.id,
parse_json(src.fact_attr)
);
Я использую PySpark для синхронизации данных:
df.write \
.option("sfWarehouse", sf_warehouse) \
.option("sfDatabase", sf_database) \
.option("sfSchema", sf_schema) \
.option("postactions", query) \
.mode("overwrite") \
.snowflake("snowflake", sf_warehouse, sf_temp_table)
С помощью приведенной выше команды я получаю следующую ошибку:
pyspark.sql.utils.IllegalArgumentException: u"Don't know how to save StructField(fact_attr,StructType(StructField(attr1,StringType,true), StructField(attr2,StringType,true)),true) of type attributes to Snowflake"
Я прочитал следующие ссылки, но безуспешно:
Вопрос:
Как вставить / синхронизировать данные из таблицы Qubole Hive с полем STRUCTснежинке?