В настоящее время я ищу подходящее решение для решения следующей проблемы с использованием Spark Structured Streaming API. Я просмотрел много постов в блоге и Stackoverflow. К сожалению, я до сих пор не могу найти решение этой проблемы. Следовательно, повышение этого билета для вызова помощи экспертов.
Вариант использования
Пусть говорят, что у меня есть Kafka Topi c (user_creation_log), который имеет все в режиме реального времени user_creation_event. Для тех пользователей, которые не совершали никаких транзакций в течение 10 секунд, 20 секунд и 30 секунд, мы назначим им определенный ваучер. (время windows сокращено для целей тестирования)
Отметить и отправить строку тайм-аута (более 10 сек c, более 20 сек c, более 30 секунд) в Kafka самая проблемная c часть !!! Слишком много правил, или, может быть, я должен разбить его на 10 c, 20 c и 30 секунд на другой сценарий
Моя таблица отслеживания
Я могу отследить пользователя no_action_sec
с помощью флага no_action_10sec
, no_action_20sec
, no_action_30sec
(показано в коде ниже). no_action_sec
является производным от (current_time - creation_time), которое будет рассчитываться в каждой микропакете.
Полный режим вывода outputMode("complete")
записывает все строки таблицы результатов (и соответствует традиционному пакетному структурированному запросу).
Обновление Режим вывода outputMode("update")
записывает только те строки, которые были обновлены (каждый раз, когда есть обновления).
В этом случае Обновить режим вывода кажется очень подходящим, потому что он будет написать обновленную строку для вывода. Однако всякий раз, когда flag10, flag20, flag30 columns
был обновлен, строка не записывала в нужное место.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("Notification") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
split_col=split(lines.value, ' ')
df = lines.withColumn('user_id', split_col.getItem(0))
df = df.withColumn('create_date_time', split_col.getItem(1)) \
.groupBy("user_id","create_date_time").count()
df = df.withColumn("create_date_time",col("create_date_time").cast(LongType())) \
.withColumn("no_action_sec", current_timestamp().cast(LongType()) -col("create_date_time").cast(LongType()) ) \
.withColumn("no_action_10sec", when(col("no_action_sec") >= 10 ,True)) \
.withColumn("no_action_20sec", when(col("no_action_sec") >= 20 ,True)) \
.withColumn("no_action_30sec", when(col("no_action_sec") >= 30 ,True)) \
query = df \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
Текущий вывод
UserId = 0 исчезает в пакете 2. Он должен отображаться, потому что no_action_30sec
изменится с нуля на True.
Ожидаемый результат Пользователь Идентификатор должен быть записан на выход 3 раза, как только он вызовет флаг logi c 10 se c, 20 se c и 30 se c
Может кто-нибудь пролить свет на эту проблему? Например, что я могу сделать, чтобы позволить строкам записывать в вывод, когда no_action_10sec
, no_action_20sec
, no_action_30sec
имеет значение True?
Отладка
OutputMode = Complete выдаст слишком много избыточных данных
Генератор фиктивных данных
for i in {0..10000}; do echo "${i} $(date +%s)"; sleep 1; done | nc -lk 9999
Предположим, что строка показывается в режиме консоли (.format("console")
) отправит Kafka для действия цепочки