Как обрабатывать записи о поздних поступлениях при дедупликации в структурированной потоковой передаче Spark? - PullRequest
0 голосов
/ 17 февраля 2020

Я только начал свой путь с потоковой передачи Spark, где я читаю данные из очереди Kafka с использованием структурированной потоковой передачи Spark. Мне нужно выполнить дедупликацию в каждой микропакете на основе ключевых столбцов (product и org) и упорядочить по полю временной отметки (booked_at).

Функция дедупликатора будет создавать микропакеты из потокового кадра данных каждые 2 секунды. Дедуплицированный фрейм данных будет возвращен другой функции для дальнейшей обработки и, наконец, сохранен в DynamoDb. Мой код работает нормально, за исключением случаев, когда в Кафке есть записи, поступающие с опозданием. Например, рассмотрим ниже 4 события в Кафке, где первые два принадлежат первому микропакету после запуска потокового задания, а последние два принадлежат второму микропакету

{"product":"p1","org":"US","quantity":1,"booked_at":"2020-02-05T00:00:05"}
{"product":"p1","org":"US","quantity":2,"booked_at":"2020-02-05T00:00:06"}
{"product":"p1","org":"US","quantity":3,"booked_at":"2020-02-05T00:00:01"}
{"product":"p1","org":"US","quantity":4,"booked_at":"2020-02-05T00:00:03"}

Токовый выход моей функции дедупликатора для каждой партии равен ниже

+-------+---+--------+-------------------+
|product|org|quantity|          booked_at|
+-------+---+--------+-------------------+
| p1    | US|       3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+

+-------+---+--------+-------------------+
|product|org|quantity|          booked_at|
+-------+---+--------+-------------------+
| p1    | US|       2|2020-02-05 00:00:03|
+-------+---+--------+-------------------+

Я могу с уверенностью предположить, что мои опоздавшие записи не будут превышать 60 секунд в Кафке, и, учитывая это предположение, я хочу, чтобы моя функция как-то считала, что последние 60 секунд событий делают дедупликацию. Поэтому, если записи micro-batch2 находятся в пределах окна 60 секунд, вывод должен быть ниже -

+-------+---+--------+-------------------+
|product|org|quantity|          booked_at|
+-------+---+--------+-------------------+
| p1    | US|       3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+

+-------+---+--------+-------------------+
|product|org|quantity|          booked_at|
+-------+---+--------+-------------------+
| p1    | US|       3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+

Некоторые решения, о которых я могу подумать -

  1. Взять 60 секунд данные каждые 2 секунды. В настоящее время он настроен на учет только последних 2 секунд данных каждые 2 секунды. Но это, возможно, невозможно, так как чтение из Kafka будет основано на смещениях вместо временной метки kafka.
  2. Сохранение последних 60 секунд данных в кэшированном кадре данных и использование его при дедупликации микропакета, содержащего 2 секунды данных. Таким образом, я верну данные только за 2 секунды следующей функции, которая будет выполнять фактическую обработку.
  3. Оставьте функцию дедупликатора такой, какой она есть сейчас, и предоставьте другую функцию, которая в конце будет сравнивать метку времени обработанной записи с что находится в DynamoDB и будет писать только в том случае, если отметка времени последней обработанной записи больше, чем существующая в Dynamo

Есть ли другой способ, с помощью которого я могу справиться с этим лучше?

Существующий код:

    def get_deduplicated_dataframe(inputDf: DataFrame, grpKeys: List[String], orderByKey: String) 
  : DataFrame = {
    val outputDf = inputDf.withColumn("rowNumber",row_number()
      .over(Window.partitionBy(grpKeys.head, grpKeys.tail: _*)
        .orderBy(col(orderByKey).desc))).filter("rowNumber = 1").drop("rowNumber")
    outputDf.collect()
    outputDf.cache()
    outputDf
  }

val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test1")
    .option("startingOffsets", "latest")
  .load()


  val schema = new StructType()
    .add("product",StringType)
    .add("org",StringType)
    .add("quantity", IntegerType)
    .add("booked_at",TimestampType)

  val payload_df = df.selectExpr("CAST(value AS STRING) as payload")
    .select(from_json(col("payload"), schema).as("data"))
    .select("data.*")

  val query = payload_df.writeStream
    .trigger(Trigger.ProcessingTime(2000L))
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>

      val groupKeys = List("product", "org")
      val orderByKey = "booked_at"
      val outputDf = get_deduplicated_dataframe(batchDF, groupKeys, orderByKey)
      outputDf.show()
    }.start()


  query.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...