Pyspark: вставьте фрейм данных в таблицу разделов кустов - PullRequest
0 голосов
/ 14 октября 2019

Приношу извинения, если я здесь очень прост, но мне нужна небольшая помощь Pyspark в попытке динамически перезаписать разделы в таблице улья. Таблицы существенно упрощены, но проблема, с которой я борюсь, ясна (я надеюсь). Я довольно новичок в PySpark и уже несколько часов ищу в StackOverflow, чтобы наконец создать учетную запись и спросить ...! Заранее спасибо !!

У меня есть массивная многораздельная таблица кустов (HIVETABLE_TRX), построенная из фрейма данных (trx). Я извлекаю больше данных в виде фрейма данных (trxup) и хочу добавить или перезаписать соответствующие разделы в HIVETABLE_TRX.

Dataframe (trx)

+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE|   TRX|
+---------------+----------+------+
|          HOTEL|2019-01-01|14298 |
|          HOTEL|2019-01-02|19020 |
|          HOTEL|2019-01-03|18927 |
+---------------+----------+------+

trx.write \
    .partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
    .saveAsTable("HIVETABLE_TRX",mode='overwrite')

#Have a look at the partitioned hive table
trxchk = spark.sql("""select * from HIVETABLE_TRX""")
trxchk.show()

+------+---------------+----------+
|   TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 |          HOTEL|2019-01-01|
|19020 |          HOTEL|2019-01-02|
|18927 |          HOTEL|2019-01-03|
+------+---------------+----------+

В фрейме данных (trxup) для добавления в таблицу Hive есть одна нужная мне строка наложенияперезаписать ('HOTEL', '2019-01-03') и добавить 3 добавочных.

#Have a look at second dataframe (trxup)
+---------------+----------+------+
|PRODUCT_LN_NAME|LOCAL_DATE|   TRX|
+---------------+----------+------+
|         FLIGHT|2019-01-03|14410 |
|          HOTEL|2019-01-03|18927 |
|         FLIGHT|2019-01-04|15430 |
|          HOTEL|2019-01-04|19198 |
+---------------+----------+------+

Я пытаюсь вставить trxup в HIVETABLE_TRX следующим образом:

trxup.write \
    .insertInto("HIVETABLE_TRX",overwrite=True)

Насколько я понимаю, это перезапишет одну общую строку между trxup и HIVETABLE_TRX и добавит остальные.

#Have a look at HIVETABLE_TRX after the basic insertInto
trxchk2 = spark.sql("""select * from HIVETABLE_TRX""")
trxchk2.show()

+----+---------------+----------+
| TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+----+---------------+----------+
|null|     2019-01-03|    14410 |
|null|     2019-01-03|    18927 |
|null|     2019-01-04|    15430 |
|null|     2019-01-04|    19198 |
+----+---------------+----------+

Как вы видите, он не может выровнять столбцы по имени и перезаписывает все существующие разделы вHIVETABLE_TRX.

Итак: 1. Как убедиться, что столбцы выровнены для insertInto? - Это лучшее, что я мог придумать, и, несмотря на успех, не кажется, что так оно и должно быть ...?

colList = spark.sql("""select * from HIVETABLE_TRX""").columns
trxup.selectExpr(colList) \
    .write \
    .insertInto("HIVETABLE_TRX")
Могу ли я вставить второй df (trxup) в таблицу многораздельных кустов (HIVETABLE_TRX), просто добавив / перезаписав соответствующие разделы?

Другие вещи, которые я пробовал после долгих поисков Google, Stackoverflow и soulsearch:

Добавлены опции для интерпретатора

hive.exec.dynamic.partition = true
hive.exec.dynamic.partition.mode = nonstrict
spark.sql.sources.partitionOverwriteMode = dynamic

Попытка разбить Trxup на insertInto

trxup.write \
    .partitionBy("PRODUCT_LN_NAME","LOCAL_DATE") \
    .insertInto("PROJECT_MERCH.AM_PARTITION_TEST_TRX",overwrite=True)

AnalysisException: u"insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;"

Удален overwrite = True из insertInto, который фактически сделал то, чтоВ этот момент я ожидал, если не то, что хочу.

+------+---------------+----------+
|   TRX|PRODUCT_LN_NAME|LOCAL_DATE|
+------+---------------+----------+
|14298 |          HOTEL|2019-01-01|
|19020 |          HOTEL|2019-01-02|
|18927 |          HOTEL|2019-01-03|
|  null|     2019-01-03|    14410 |
|  null|     2019-01-03|    18927 |
|  null|     2019-01-04|    15430 |
|  null|     2019-01-04|    19198 |
+------+---------------+----------+

Я понимаю, что мог бы преобразовать trxup в таблицу многораздельных кустов (HIVETABLE_TRXUP) и затем объединить их вместе, но кажется, что это не оптимальный способсделать это - что-то вроде поражения цели разделения таблицы, нет?

trxjoined = spark.sql("""select * from HIVETABLE_TRX t full outer join HIVETABLE_TRXUP tu on t.SITE_NAME=tu.SITE_NAME and t.LOCAL_DATE=tu.LOCAL_DATE""")
spark.sql("""drop table if exists HIVETABLE_TRX""")
spark.sql("""drop table if exists HIVETABLE_TRXUP""")
trxjoined.write \
    .partitionBy("SITE_NAME","LOCAL_DATE") \
    .saveAsTable("HIVETABLE_TRX",mode='overwrite')
...