Каков правильный подход к самостоятельному присоединению к потоку из Kafka для создания пар смежных записей на основе ключа?
Лог c состоит в том, чтобы найти событие, которое произошло ранее для того же самого ключевые значения и в течение некоторого короткого промежутка времени. Каждая запись имеет уникальный идентификатор, который помогает удалять дубликаты.
Например, это может быть поиск предыдущего действия пользователя в потоке журналов, ключ здесь будет user_id, а ограничение по времени будет ~ 5 мин.
например, серия событий для одного пользователя:
a
, b
, c
, d
в идеале должно выдавать (при условии, что все в течение 5 минут):
(a,b)
, (b,c)
, (c,d)
Проблема состоит в том, чтобы избежать генерации всех возможных пар ( (n-1)! из них), т.е.:
(a,b)
, (a,c)
, (a,d)
, (b,c)
, (b, d)
, (c,d)
Простое (не идеальное) решение:
- объединить по ключу (
first.key = second.key
) - фильтр по меткам времени (
first.timestamp < second.timestamp AND first.timestamp >= second.timestamp - interval 5 minutes
) - удалить дубликаты на основе
second.id
(или first.id
), чтобы избавиться от лишних пар
Недостатки:
- [основной] он создает промежуточную стадию, где генерируются все комбинации (шаг 2) и их необходимо перемещать между этапами, это, как представляется, приводит к задержкам и из-за сбоев памяти
Другая информация:
- нет никакого контроля над тем, сколько сообщений для одного ключа существует (n! количество всех пар - реальная проблема)
- переупорядоченные или повторяющиеся входящие сообщения редки, но возможно
- входящие данные Kafka уже разбиты по значению ключа
Вот иллюстрация концепции (упрощенный код простого решения выше):
Dataset<Row> in = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConnection)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.filter("timestamp >= current_timestamp() - INTERVAL 20 minutes") // too old, ignore
.select(from_json(deserializeUdf.apply(col("value")), structType).alias("value")) // deserialize data to json
.filter("value.Timestamp >= current_timestamp() - INTERVAL 20 minutes") // too old, ignore
;
Dataset<Row> joined =
in.as("first")
.join(in.as("second"), expr(
"first.value.Key = second.value.Key"
+ " AND first.value.Timestamp < second.value.Timestamp"
+ " AND first.value.Timestamp >= second.value.Timestamp - interval 5 minutes"
))
.select(col("second.value.id"), col("second.value.timestamp"), col("first.value").alias("a"), col("second.value").alias("b"))
.withWatermark("timestamp", "20 minutes")
.dropDuplicates("id", "timestamp")
;
DataStreamWriter writer =
joined.select(to_json(struct(col("*"))).alias("value"))
.writeStream()
.format("kafka")
.outputMode(OutputMode.Append())
.option("kafka.bootstrap.servers", outKafkaConnection)
.option("serializer.encoding", "UTF8")
.option("topic", outTopic)
.trigger(Trigger.ProcessingTime("30 seconds"));
StreamingQuery query = writer.start();