Построение окна на основе меток времени сообщения в Spark DStream - PullRequest
1 голос
/ 15 июня 2019

Я получаю DStream от Кафки и хочу сгруппировать все сообщения в каком-то скользящем окне по ключам.

Дело в том, что это окно должно основываться на временных отметках, предоставленных в каждом сообщении (отдельное поле):

Message structure
--------------------------
key1, ..., ..., 1557678233
key1, ..., ..., 1557678234 
key2, ..., ..., 1557678235 

Итак, я хочу рассмотреть сообщения, где для каждого ключа timestamp of the first message - timestamp of the last message <= 5 минут </p>

Как я вижу из этого вопроса , это невозможно, поскольку Spark считает только системное время для событий. Парень там предлагает использовать updateStateByKey, что для меня не очень понятно ...

Может быть, мы могли бы достичь этого, используя другой подход?

Как насчет включения различий временной метки в combiners для функции combineByKey с дальнейшим суммированием и фильтрацией по порогу длительностей?

Пожалуйста, добавьте свои мысли по этому поводу или поделитесь своим решением, если у вас была возможность столкнуться с той же проблемой ...

Спасибо!

Ответы [ 2 ]

1 голос
/ 22 июня 2019

Возможно ли это?Без сомнения. Apache Beam , который, помимо прочего, предоставляет Бэкэнд Apache Spark может легко обрабатывать такие операции.

Однако это определенно не то, что вы хотите реализовать самостоятельно, если у вас нет значительныхресурсы для развития и много ноу-хау в вашем распоряжении.И если бы вы имели, вы, вероятно, не задали бы этот вопрос в первую очередь.

Обработка поздних событий, событий не по порядку и восстановление после сбоев узлов в лучшем случае может быть сложным, с большим числомкрайние случаи.

Более того, прежде чем вы на самом деле его внедрите, он устареет - DStream уже считается устаревшим API и, скорее всего, рано или поздно достигнет своего срока службы.В то же время Структурированная потоковая передача уже может обрабатывать временные окна событий "из коробки".

0 голосов
/ 26 июня 2019

Протестировано с приведенными ниже примерами данных, и я предполагаю, что отметка времени в формате эпохи -

[key1, ..., ..., 1557678233]
[key1, ..., ..., 1557678234]
[key2, ..., ..., 1557678235]
[key2, ..., ..., 1557678240]
[key2, ..., ..., 1557678271]
[key3, ..., ..., 1557678635]
[key3, ..., ..., 1557678636]
[key3, ..., ..., 1557678637]
[key3, ..., ..., 1557678638]
[key3, ..., ..., 1557678999]

// - создать udf для возврата, если запись должна быть обработана или отклонена

scala> spark.udf.register("recordStatusUDF", (ts:String) => {
     |     val ts_array = ts.split(",",-1)
     |     if ((ts_array.max.trim.toLong - ts_array.min.trim.toLong) <= 300) {
     |        "process"
     |     }
     |     else { "reject" }
     | })
res83: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

// - Создать схему

scala> val schema = StructType(Seq(StructField("key", StringType, true),StructField("col2", StringType, true),StructField("col3", StringType, true),StructField("epoch_ts", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(key,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(epoch_ts,StringType,true))

// - Создать фрейм данных

scala> spark.createDataFrame(rdd,schema).createOrReplaceTempView("kafka_messages")


scala> spark.sql(s""" select x.key, recordStatusUDF(x.ts) as action_ind from ( select key, concat_ws(",", collect_list(epoch_ts)) as ts from kafka_messages group by key)x """).createOrReplaceTempView("action")

scala> val result = spark.sql(s""" select km.* from kafka_messages km inner join action ac on km.key = ac.key and ac.action_ind = "process" """)
result: org.apache.spark.sql.DataFrame = [key: string, col2: string ... 2 more fields]

scala> result.show(false)
+----+----+----+-----------+
|key |col2|col3|epoch_ts   |
+----+----+----+-----------+
|key1| ...| ...| 1557678233|
|key1| ...| ...| 1557678234|
|key2| ...| ...| 1557678235|
|key2| ...| ...| 1557678240|
|key2| ...| ...| 1557678271|
+----+----+----+-----------+

Вы можете использовать указанный выше код для каждого rdd (сообщения kafka). Надеюсь, что это полезно.

...