Приношу извинения, если я здесь очень прост, но мне нужна небольшая помощь Pyspark в попытке динамически перезаписать разделы в таблице улья. Таблицы существенно упрощены, но проблема, с которой я борюсь, ясна (я надеюсь). Я довольно новичок в PySpark и уже несколько часов ищу в StackOverflow, чтобы наконец создать учетную запись и спросить ...! Заранее спасибо !!
У меня есть массивная многораздельная таблица кустов (HIVETABLE_TRX), построенная из фрейма данных (trx). Я извлекаю больше данных в виде фрейма данных (trxup) и хочу добавить или перезаписать соответствующие разделы в HIVETABLE_TRX.
Dataframe (trx)
+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE| TRX|
+---------------+----------+------+
| HOTEL|2019-01-01|14298 |
| HOTEL|2019-01-02|19020 |
| HOTEL|2019-01-03|18927 |
+---------------+----------+------+
trx.write \
.partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
.saveAsTable("HIVETABLE_TRX",mode='overwrite')
#Have a look at the partitioned hive table
trxchk = spark.sql("""select * from HIVETABLE_TRX""")
trxchk.show()
+------+---------------+----------+
| TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 | HOTEL|2019-01-01|
|19020 | HOTEL|2019-01-02|
|18927 | HOTEL|2019-01-03|
+------+---------------+----------+
В фрейме данных (trxup) для добавления в таблицу Hive есть одна нужная мне строка наложенияперезаписать ('HOTEL', '2019-01-03') и добавить 3 добавочных.
#Have a look at second dataframe (trxup)
+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE| TRX|
+---------------+----------+------+
| FLIGHT|2019-01-03|14410 |
| HOTEL|2019-01-03|18927 |
| FLIGHT|2019-01-04|15430 |
| HOTEL|2019-01-04|19198 |
+---------------+----------+------+
Я пытаюсь вставить trxup в HIVETABLE_TRX следующим образом:
trxup.write \
.insertInto("HIVETABLE_TRX",overwrite=True)
Насколько я понимаю, это перезапишет одну общую строку между trxup и HIVETABLE_TRX и добавит остальные.
#Have a look at HIVETABLE_TRX after the basic insertInto
trxchk2 = spark.sql("""select * from HIVETABLE_TRX""")
trxchk2.show()
+----+---------------+----------+
| TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+----+---------------+----------+
|null| 2019-01-03| 14410 |
|null| 2019-01-03| 18927 |
|null| 2019-01-04| 15430 |
|null| 2019-01-04| 19198 |
+----+---------------+----------+
Как вы видите, он не может выровнять столбцы по имени и перезаписывает все существующие разделы вHIVETABLE_TRX.
Итак: 1. Как убедиться, что столбцы выровнены для insertInto? - Это лучшее, что я мог придумать, и, несмотря на успех, не кажется, что так оно и должно быть ...?
colList = spark.sql("""select * from HIVETABLE_TRX""").columns
trxup.selectExpr(colList) \
.write \
.insertInto("HIVETABLE_TRX")
Могу ли я вставить второй df (trxup) в таблицу многораздельных кустов (HIVETABLE_TRX), просто добавив / перезаписав соответствующие разделы?
Другие вещи, которые я пробовал после долгих поисков Google, Stackoverflow и soulsearch:
Добавлены опции для интерпретатора
hive.exec.dynamic.partition = true
hive.exec.dynamic.partition.mode = nonstrict
spark.sql.sources.partitionOverwriteMode = dynamic
Попытка разбить Trxup на insertInto
trxup.write \
.partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
.insertInto("PROJECT_MERCH.AM_PARTITION_TEST_TRX",overwrite=True)
AnalysisException: u"insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;"
Удален overwrite = True из insertInto, который фактически сделал то, чтоВ этот момент я ожидал, если не то, что хочу.
+------+---------------+----------+
| TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 | HOTEL|2019-01-01|
|19020 | HOTEL|2019-01-02|
|18927 | HOTEL|2019-01-03|
| null| 2019-01-03| 14410 |
| null| 2019-01-03| 18927 |
| null| 2019-01-04| 15430 |
| null| 2019-01-04| 19198 |
+------+---------------+----------+
Я понимаю, что мог бы преобразовать trxup в таблицу многораздельных кустов (HIVETABLE_TRXUP) и затем объединить их вместе, но кажется, что это не оптимальный способсделать это - что-то вроде поражения цели разделения таблицы, нет?
trxjoined = spark.sql("""select * from HIVETABLE_TRX t full outer join HIVETABLE_TRXUP tu on t.SITE_NAME=tu.SITE_NAME and t.LOCAL_DATE=tu.LOCAL_DATE""")
spark.sql("""drop table if exists HIVETABLE_TRX""")
spark.sql("""drop table if exists HIVETABLE_TRXUP""")
trxjoined.write \
.partitionBy("SITE_NAME","LOCAL_DATE") \
.saveAsTable("HIVETABLE_TRX",mode='overwrite')