Поток записывает несколько одинаковых ключей в дельта-озеро - PullRequest
2 голосов
/ 18 июня 2020

Я записываю потоки в дельта-озеро с помощью искрового структурированного потока. Каждый пакет потоковой передачи содержит ключ-значение (также содержит метку времени в виде одного столбца). delta lake не поддерживает обновление с несколькими одинаковыми ключами в источнике (паровая партия), поэтому я хочу обновить delta lake только с записью с последней меткой времени. Как я могу это сделать?

Это фрагмент кода, который я пробую:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  println(s"Executing batch $batchId ...")
  microBatchOutputDF.show()

  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

Заранее спасибо.

Ответы [ 2 ]

3 голосов
/ 18 июня 2020

Вы можете удалить записи, имеющие более старую временную метку из вашего фрейма данных "microBatchOutputDF", и сохранить только запись с последней временной меткой для данного ключа.

Вы можете использовать операцию Spark 'reduceByKey' и реализовать настраиваемую функцию уменьшения, как показано ниже.

def getLatestEvents(input: DataFrame) : RDD[Row] = {
input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }


def reduceFun(x: Row, y: Row) : Row = {
if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }

Предполагаемый ключ имеет тип строки, а метка времени - метка времени. И вызовите getLatestEvents для вашего потокового пакета microBatchOutputDF. Он игнорирует более старые события с метками времени и сохраняет только последнее.

val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)

Затем вызовите операцию слияния deltalake поверх 'latestRecordsDF'

1 голос
/ 18 июня 2020

При потоковой передаче микропакета у вас может быть более одной записи для данного ключа. Чтобы обновить его целевой таблицей, вы должны выяснить последнюю запись для ключа в микропакете. В вашем случае вы можете использовать столбец max timestamp и столбец значения, чтобы найти последнюю запись и использовать ее для операции слияния.

Вы можете обратиться к этой ссылке для получения дополнительной информации о поиске последняя запись для данного ключа.

...