Соедините два спарк Dataframe, используя вложенный столбец и обновите один из столбцов - PullRequest
0 голосов
/ 13 января 2020

Я работаю над некоторым требованием, в котором я получаю одну маленькую таблицу из файла CSV следующим образом:

root
 |-- ACCT_NO: string (nullable = true)
 |-- SUBID: integer (nullable = true)
 |-- MCODE: string (nullable = true)
 |-- NewClosedDate: timestamp (nullable = true

У нас также есть очень большая внешняя таблица улья в форме Avro, которая хранится в HDFS следующим образом:

root
-- accountlinks: array (nullable = true)
 |    |    |-- account: struct (nullable = true)
 |    |    |    |-- acctno: string (nullable = true)
 |    |    |    |-- subid: string (nullable = true)
 |    |    |    |-- mcode: string (nullable = true)
 |    |    |    |-- openeddate: string (nullable = true)
 |    |    |    |-- closeddate: string (nullable = true)

Теперь требуется поискать внешнюю таблицу кустов на основе трех столбцов из файла csv: ACCT_NO - SUBID - MCODE. Если это совпадает, обновите accountlinks.account.closeddate с NewClosedDate из файла CSV.

Я уже написал следующий код, чтобы взорвать необходимые столбцы и соединить его с небольшой таблицей, но я не совсем уверен, как обновить поле closeddate (в настоящее время оно является нулевым для всех владельцев учетной записи) с помощью NewClosedDate, потому что closeddate - это вложенный столбец, и я не могу легко использовать withColumn для его заполнения. Кроме того, имена схем и столбцов не могут быть изменены, поскольку эти файлы связаны с некоторой внешней таблицей кустов.

 val df = spark.sql("select * from db.table where archive='201711'")

val ExtractedColumn = df 
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))

val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")

val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")

1 Ответ

2 голосов
/ 13 января 2020

Все, что вам нужно, это взорвать массив accountlinks, а затем соединить 2 кадра данных следующим образом:

val explodedDF = df.withColumn("account", explode($"accountlinks"))
val joinCondition = $"ACCT_NO" === $"account.acctno" && $"SUBID" === $"account.subid" && $"MCODE" === $"account.mcode"
val joinDF  = explodedDF.join(ReferenceData, joinCondition, "left")

Теперь вы можете обновить столбец структуры account, как показано ниже, и собрать список, чтобы получить назад структура массива:

val FinalData = joinDF.withColumn("account", 
                                  struct($"account.acctno", $"account.subid", $"account.mcode", 
                                         $"account.openeddate", $"NewClosedDate".alias("closeddate")
                                        )
                                 )
                        .groupBy().agg(collect_list($"account").alias("accountlinks"))

Идея состоит в том, чтобы создать новую структуру со всеми полями из account, за исключением closedate, которые вы получаете из столбца NewCloseDate.

Если структура содержит много полей, вы можете использовать для понимания все поля, кроме даты закрытия, чтобы не вводить их все.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...