Объединение двух DataFrame с использованием spark 2.x с различными schema / dataTypes - PullRequest
0 голосов
/ 01 сентября 2018

Я пытаюсь объединить несколько таблиц улья, используя spark, где некоторые столбцы с одинаковыми именами имеют разные типы данных, особенно string и bigint.

Моя финальная таблица (hiveDF) должна иметь следующую схему:

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | bigint     |          |
| approvalstatus           | string     |          |
| capitalrate              | double     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | bigint     |          |
| divfeerate               | double     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | double     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | double     |          |
| qualifiedratepercent     | double     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | double     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | bigint     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

Эта схема финальной таблицы (hiveDF) может быть сделана с помощью ниже JSON-

{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}

Я делаю что-то вроде ниже-

var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")

Моя инкрементная таблица (newDataDF) содержит несколько столбцов с разными типами данных. У меня есть около 10 инкрементных таблиц, где где-то bigint и в другой таблице то же самое, что и в строке, так что не могу быть уверен, если я сделаю Typecast. Typecast может быть простым, но я не уверен, какой тип мне следует делать, поскольку существует несколько таблиц. Я ищу любой подход, где без typecast я могу обойтись.

Например, для инкрементальной таблицы показано что-то вроде ниже-

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | string     |          |
| approvalstatus           | string     |          |
| capitalrate              | string     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | string     |          |
| divfeerate               | string     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | string     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | string     |          |
| qualifiedratepercent     | string     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | string     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | string     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

Я делаю этот союз для таблицы что-то вроде ниже-

var combinedDF = hiveDF.unionAll(newDataDF)

но не повезло. Я попытался дать окончательную схему, как показано ниже, но безуспешно -

val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")

Согласно это , я пробовал ниже-

var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)

Наконец, я пытаюсь записать в таблицу, как указано выше, но не повезло, пожалуйста, помогите мне-

1 Ответ

0 голосов
/ 02 сентября 2018

Я также сталкивался с этой ситуацией при объединении инкрементной таблицы с существующей таблицей. Обычно есть 2 случая для обработки

1. Добавочные данные с дополнительным столбцом:

Это можно решить с помощью обычного процесса слияния, который вы пытаетесь здесь.

2. Добавочные данные с тем же именем столбца, но с другой схемой:

Это хитрый. Одним из простых решений является преобразование данных бота в toJSON и объединение hiveDF.toJSON.union(newDataDF.toJSON). Это, однако, приведет к слиянию схемы json и изменит существующую схему. Например: если столбец a:Long в таблице и a:String в инкрементной таблице, после объединения окончательной схемой будет: String. Невозможно уклониться от этого, если вы хотите сделать JSON Union.

Альтернативой этому является строгая проверка схемы для дополнительных данных. Вы проверяете, имеет ли инкрементная таблица ту же схему, что и таблица кустов, если схема отличается, не объединяйте.

Это, однако, немного слишком строго, так как для данных в реальном времени довольно сложно применить схемы.

Так что, как я решил, это иметь отдельный процесс обогащения перед слиянием. Процесс фактически проверяет схему и, если входящий столбец может быть обновлен / понижен до текущей схемы таблицы кустов, он делает это.

По сути, он перебирает входящую дельту, для каждой строки преобразует это в правильную схему. Это немного дорого, но дает очень хорошую гарантию правильности данных. В случае, если процесс не может преобразовать строку. Я ограничиваю строку и поднимаю тревогу, чтобы данные могли быть проверены вручную для любой ошибки в вышестоящей системе, которая генерирует данные.

Это код , который я использую, чтобы проверить, можно ли объединить две схемы.

...