Проблема с Spark Structural Streaming Output Mode - PullRequest
1 голос
/ 08 января 2020

В настоящее время я ищу подходящее решение для решения следующей проблемы с использованием Spark Structured Streaming API. Я просмотрел много постов в блоге и Stackoverflow. К сожалению, я до сих пор не могу найти решение этой проблемы. Следовательно, повышение этого билета для вызова помощи экспертов.

Вариант использования

Пусть говорят, что у меня есть Kafka Topi c (user_creation_log), который имеет все в режиме реального времени user_creation_event. Для тех пользователей, которые не совершали никаких транзакций в течение 10 секунд, 20 секунд и 30 секунд, мы назначим им определенный ваучер. (время windows сокращено для целей тестирования)

Отметить и отправить строку тайм-аута (более 10 сек c, более 20 сек c, более 30 секунд) в Kafka самая проблемная c часть !!! Слишком много правил, или, может быть, я должен разбить его на 10 c, 20 c и 30 секунд на другой сценарий

Моя таблица отслеживания

enter image description here

Я могу отследить пользователя no_action_sec с помощью флага no_action_10sec, no_action_20sec, no_action_30sec (показано в коде ниже). no_action_sec является производным от (current_time - creation_time), которое будет рассчитываться в каждой микропакете.

Полный режим вывода outputMode("complete") записывает все строки таблицы результатов (и соответствует традиционному пакетному структурированному запросу).

Обновление Режим вывода outputMode("update") записывает только те строки, которые были обновлены (каждый раз, когда есть обновления).

В этом случае Обновить режим вывода кажется очень подходящим, потому что он будет написать обновленную строку для вывода. Однако всякий раз, когда flag10, flag20, flag30 columns был обновлен, строка не записывала в нужное место.


from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Notification") \
    .getOrCreate()

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()
split_col=split(lines.value, ' ')
df = lines.withColumn('user_id', split_col.getItem(0))
df = df.withColumn('create_date_time', split_col.getItem(1)) \
    .groupBy("user_id","create_date_time").count() 
df = df.withColumn("create_date_time",col("create_date_time").cast(LongType())) \
       .withColumn("no_action_sec",  current_timestamp().cast(LongType())  -col("create_date_time").cast(LongType())    ) \
       .withColumn("no_action_10sec", when(col("no_action_sec") >= 10 ,True)) \
       .withColumn("no_action_20sec", when(col("no_action_sec") >= 20 ,True)) \
       .withColumn("no_action_30sec", when(col("no_action_sec") >= 30 ,True)) \


query = df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

Текущий вывод

UserId = 0 исчезает в пакете 2. Он должен отображаться, потому что no_action_30sec изменится с нуля на True.

enter image description here

Ожидаемый результат Пользователь Идентификатор должен быть записан на выход 3 раза, как только он вызовет флаг logi c 10 se c, 20 se c и 30 se c

Может кто-нибудь пролить свет на эту проблему? Например, что я могу сделать, чтобы позволить строкам записывать в вывод, когда no_action_10sec, no_action_20sec, no_action_30sec имеет значение True?

Отладка

OutputMode = Complete выдаст слишком много избыточных данных OutputMode=

Генератор фиктивных данных

for i in {0..10000}; do echo "${i} $(date +%s)"; sleep 1; done | nc -lk 9999

Предположим, что строка показывается в режиме консоли (.format("console")) отправит Kafka для действия цепочки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...