Я только начал свой путь с потоковой передачи Spark, где я читаю данные из очереди Kafka с использованием структурированной потоковой передачи Spark. Мне нужно выполнить дедупликацию в каждой микропакете на основе ключевых столбцов (product и org) и упорядочить по полю временной отметки (booked_at).
Функция дедупликатора будет создавать микропакеты из потокового кадра данных каждые 2 секунды. Дедуплицированный фрейм данных будет возвращен другой функции для дальнейшей обработки и, наконец, сохранен в DynamoDb. Мой код работает нормально, за исключением случаев, когда в Кафке есть записи, поступающие с опозданием. Например, рассмотрим ниже 4 события в Кафке, где первые два принадлежат первому микропакету после запуска потокового задания, а последние два принадлежат второму микропакету
{"product":"p1","org":"US","quantity":1,"booked_at":"2020-02-05T00:00:05"}
{"product":"p1","org":"US","quantity":2,"booked_at":"2020-02-05T00:00:06"}
{"product":"p1","org":"US","quantity":3,"booked_at":"2020-02-05T00:00:01"}
{"product":"p1","org":"US","quantity":4,"booked_at":"2020-02-05T00:00:03"}
Токовый выход моей функции дедупликатора для каждой партии равен ниже
+-------+---+--------+-------------------+
|product|org|quantity| booked_at|
+-------+---+--------+-------------------+
| p1 | US| 3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+
+-------+---+--------+-------------------+
|product|org|quantity| booked_at|
+-------+---+--------+-------------------+
| p1 | US| 2|2020-02-05 00:00:03|
+-------+---+--------+-------------------+
Я могу с уверенностью предположить, что мои опоздавшие записи не будут превышать 60 секунд в Кафке, и, учитывая это предположение, я хочу, чтобы моя функция как-то считала, что последние 60 секунд событий делают дедупликацию. Поэтому, если записи micro-batch2 находятся в пределах окна 60 секунд, вывод должен быть ниже -
+-------+---+--------+-------------------+
|product|org|quantity| booked_at|
+-------+---+--------+-------------------+
| p1 | US| 3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+
+-------+---+--------+-------------------+
|product|org|quantity| booked_at|
+-------+---+--------+-------------------+
| p1 | US| 3|2020-02-05 00:00:06|
+-------+---+--------+-------------------+
Некоторые решения, о которых я могу подумать -
- Взять 60 секунд данные каждые 2 секунды. В настоящее время он настроен на учет только последних 2 секунд данных каждые 2 секунды. Но это, возможно, невозможно, так как чтение из Kafka будет основано на смещениях вместо временной метки kafka.
- Сохранение последних 60 секунд данных в кэшированном кадре данных и использование его при дедупликации микропакета, содержащего 2 секунды данных. Таким образом, я верну данные только за 2 секунды следующей функции, которая будет выполнять фактическую обработку.
- Оставьте функцию дедупликатора такой, какой она есть сейчас, и предоставьте другую функцию, которая в конце будет сравнивать метку времени обработанной записи с что находится в DynamoDB и будет писать только в том случае, если отметка времени последней обработанной записи больше, чем существующая в Dynamo
Есть ли другой способ, с помощью которого я могу справиться с этим лучше?
Существующий код:
def get_deduplicated_dataframe(inputDf: DataFrame, grpKeys: List[String], orderByKey: String)
: DataFrame = {
val outputDf = inputDf.withColumn("rowNumber",row_number()
.over(Window.partitionBy(grpKeys.head, grpKeys.tail: _*)
.orderBy(col(orderByKey).desc))).filter("rowNumber = 1").drop("rowNumber")
outputDf.collect()
outputDf.cache()
outputDf
}
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
val schema = new StructType()
.add("product",StringType)
.add("org",StringType)
.add("quantity", IntegerType)
.add("booked_at",TimestampType)
val payload_df = df.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")
val query = payload_df.writeStream
.trigger(Trigger.ProcessingTime(2000L))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
val groupKeys = List("product", "org")
val orderByKey = "booked_at"
val outputDf = get_deduplicated_dataframe(batchDF, groupKeys, orderByKey)
outputDf.show()
}.start()
query.awaitTermination()