Scala Code - Консолидация нескольких файлов данных одного формата за несколько дат с использованием ключевых полей и сохранение только самых последних данных - PullRequest
0 голосов
/ 05 октября 2018

Получено несколько файлов данных, которые принадлежат разным неделям - все файлы одного формата.Мне нужно объединить файлы, используя Scala-код, который работает на Spark.Конечным результатом должны быть только уникальные записи по ключу, также конечный результат должен сохранять запись из последнего файла для тех же ключевых полей.

Каждый файл данных потенциально может иметь около 1/2 миллиарда записей, и, следовательно, код должен быть быстродействующим ...

Пример:

Последний файл данных

CID PID Metric
C1  P1  10
C2  P1  20
C2  P2  30

Предыдущие данные Файл

CID PID Metric
C1  P1  20
C2  P1  30
C3  P1  40
C3  P2  50

Самый старый файл данных

CID PID Metric
C1  P1  30
C2  P1  40
C3  P1  50
C3  P2  60
C4  P1  30

Ожидаемый файл вывода

C1  P1  10
C2  P1  20
C2  P2  30
C3  P1  40
C3  P2  50
C4  P1  30

1 Ответ

0 голосов
/ 05 октября 2018

Возрастной столбец может быть назначен каждому Dataframe, затем Dataframes объединены в один, а затем используется оконная функция:

// data preparation
val columnNames = List("CID", "PID", "Metric")
val latest = List(
  ("C1", "P1", 10),
  ("C2", "P1", 20),
  ("C2", "P2", 30)
).toDF(columnNames: _*)

val previous = List(
  ("C1", "P1", 20),
  ("C2", "P1", 30),
  ("C3", "P1", 40),
  ("C3", "P2", 50)
).toDF(columnNames: _*)

val oldest = List(
  ("C1", "P1", 30),
  ("C2", "P1", 40),
  ("C3", "P1", 50),
  ("C3", "P2", 60),
  ("C4", "P1", 30)
).toDF(columnNames: _*)
// \ data preparation

val dfList = List(oldest, previous, latest)
val dfListWithIndexColumn = dfList.zipWithIndex.map { case (df, index) => df.withColumn("age", lit(index)) }
val unitedDF = dfListWithIndexColumn.reduce(_ union _)

val cidPidWindow = Window.partitionBy("CID", "PID").orderBy($"age".desc)
val result = unitedDF
  .withColumn("rank", rank.over(cidPidWindow))
  .where($"rank" === 1)
  .drop("age", "rank")

result.show(false)

Выход:

+---+---+------+
|CID|PID|Metric|
+---+---+------+
|C1 |P1 |10    |
|C2 |P1 |20    |
|C2 |P2 |30    |
|C3 |P1 |40    |
|C3 |P2 |50    |
|C4 |P1 |30    |
+---+---+------+
...