Я работаю над некоторым требованием, в котором я получаю одну маленькую таблицу из файла CSV следующим образом:
root
|-- ACCT_NO: string (nullable = true)
|-- SUBID: integer (nullable = true)
|-- MCODE: string (nullable = true)
|-- NewClosedDate: timestamp (nullable = true
У нас также есть очень большая внешняя таблица улья в форме Avro, которая хранится в HDFS следующим образом:
root
-- accountlinks: array (nullable = true)
| | |-- account: struct (nullable = true)
| | | |-- acctno: string (nullable = true)
| | | |-- subid: string (nullable = true)
| | | |-- mcode: string (nullable = true)
| | | |-- openeddate: string (nullable = true)
| | | |-- closeddate: string (nullable = true)
Теперь требуется поискать внешнюю таблицу кустов на основе трех столбцов из файла csv: ACCT_NO - SUBID - MCODE
. Если это совпадает, обновите accountlinks.account.closeddate
с NewClosedDate
из файла CSV.
Я уже написал следующий код, чтобы взорвать необходимые столбцы и соединить его с небольшой таблицей, но я не совсем уверен, как обновить поле closeddate (в настоящее время оно является нулевым для всех владельцев учетной записи) с помощью NewClosedDate, потому что closeddate - это вложенный столбец, и я не могу легко использовать withColumn для его заполнения. Кроме того, имена схем и столбцов не могут быть изменены, поскольку эти файлы связаны с некоторой внешней таблицей кустов.
val df = spark.sql("select * from db.table where archive='201711'")
val ExtractedColumn = df
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))
val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")
val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")