Поток Spark + Kafka, лучший способ самостоятельного объединения пары соседних записей - PullRequest
0 голосов
/ 20 апреля 2020

Каков правильный подход к самостоятельному присоединению к потоку из Kafka для создания пар смежных записей на основе ключа?

Лог c состоит в том, чтобы найти событие, которое произошло ранее для того же самого ключевые значения и в течение некоторого короткого промежутка времени. Каждая запись имеет уникальный идентификатор, который помогает удалять дубликаты.

Например, это может быть поиск предыдущего действия пользователя в потоке журналов, ключ здесь будет user_id, а ограничение по времени будет ~ 5 мин.

например, серия событий для одного пользователя:

a, b, c, d

в идеале должно выдавать (при условии, что все в течение 5 минут):

(a,b), (b,c), (c,d)

Проблема состоит в том, чтобы избежать генерации всех возможных пар ( (n-1)! из них), т.е.:

(a,b), (a,c), (a,d), (b,c), (b, d), (c,d)

Простое (не идеальное) решение:

  1. объединить по ключу (first.key = second.key)
  2. фильтр по меткам времени (first.timestamp < second.timestamp AND first.timestamp >= second.timestamp - interval 5 minutes)
  3. удалить дубликаты на основе second.id (или first.id), чтобы избавиться от лишних пар

Недостатки:

  • [основной] он создает промежуточную стадию, где генерируются все комбинации (шаг 2) и их необходимо перемещать между этапами, это, как представляется, приводит к задержкам и из-за сбоев памяти

Другая информация:

  • нет никакого контроля над тем, сколько сообщений для одного ключа существует (n! количество всех пар - реальная проблема)
  • переупорядоченные или повторяющиеся входящие сообщения редки, но возможно
  • входящие данные Kafka уже разбиты по значению ключа

Вот иллюстрация концепции (упрощенный код простого решения выше):

Dataset<Row> in = spark.readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaConnection)
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .filter("timestamp >= current_timestamp() - INTERVAL 20 minutes") // too old, ignore
    .select(from_json(deserializeUdf.apply(col("value")), structType).alias("value")) // deserialize data to json
    .filter("value.Timestamp >= current_timestamp() - INTERVAL 20 minutes") // too old, ignore
    ;

Dataset<Row> joined =
    in.as("first")
    .join(in.as("second"), expr(
        "first.value.Key = second.value.Key"
        + " AND first.value.Timestamp < second.value.Timestamp"
        + " AND first.value.Timestamp >= second.value.Timestamp - interval 5 minutes"
    ))
    .select(col("second.value.id"), col("second.value.timestamp"), col("first.value").alias("a"), col("second.value").alias("b"))
    .withWatermark("timestamp", "20 minutes")
    .dropDuplicates("id", "timestamp")
    ;

DataStreamWriter writer =
    joined.select(to_json(struct(col("*"))).alias("value"))
        .writeStream()
        .format("kafka")
        .outputMode(OutputMode.Append())
        .option("kafka.bootstrap.servers", outKafkaConnection)
        .option("serializer.encoding", "UTF8")
        .option("topic", outTopic)
        .trigger(Trigger.ProcessingTime("30 seconds"));

StreamingQuery query = writer.start();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...