Самосоединение структурированных потоков для группировки входящих событий возвращает частичные результаты - PullRequest
0 голосов
/ 13 января 2020

Я пытался протестировать агрегаты на структурированных потоках в Spark. Для проверки я использую следующие два события:

{'type': 'CALL_START', 'id': '0f048308-9b05-4fdf-ab08-f563f2e8c512', 'user_id': '62cebbdc-d491-4530-b665-702832e393db', 'emit_timestamp': 1578921173.922, 'num': 0}

и

{'type': 'CALL_END', 'id': '0f048308-9b05-4fdf-ab08-f563f2e8c512', 'user_id': '62cebbdc-d491-4530-b665-702832e393db', 'emit_timestamp': 1578921179.753, 'num': 1}

Я хочу иметь возможность окончательно рассчитать общее время вызова: (Отметка времени окончания вызова - отметка времени call_started ). Я использовал следующий код:

from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, StringType, LongType, TimestampType, DoubleType
from pyspark.sql.functions import from_unixtime, to_timestamp, udf, col, unix_timestamp, lag, pandas_udf, PandasUDFType, expr
from datetime import datetime


def format_timestamp(ts):
    return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("redis-example") \
        .master("local[*]") \
        .config("spark.redis.host", "localhost") \
        .config("spark.redis.port", "6379") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    clicks = spark.readStream\
        .format("redis")\
        .option("stream.keys", "clicks")\
        .schema(
            StructType([
                StructField("id", StringType(), True),
                StructField("type", StringType(), True),
                StructField("num", LongType(), True),
                StructField("user_id", StringType(), True),
                StructField("emit_timestamp", DoubleType(), True),
            ])\
        )\
        .load()


    format_timestamp_udf = udf(lambda x: format_timestamp(x))
    added_timestamp =  clicks.withColumn("timestamp", col("emit_timestamp").cast(TimestampType()))\
                            .withWatermark('timestamp', '2 hours')
    strm = added_timestamp.alias('l').join(added_timestamp.alias('r'),
                                        expr("""
                                           r.timestamp < l.timestamp + interval 1 hour AND
                                           l.id = r.id"""),
                                           "leftOuter"
                                        )\
                .select(col('l.id').alias('id'),
                        col('l.user_id').alias('user_id'),
                        col('l.timestamp').alias('left_ts'),
                        col('r.timestamp').alias('right_ts'),
                        col('l.type').alias('l_type'),
                        col('r.type').alias('r_type'))

    query = strm.writeStream \
        .outputMode("append")\
        .format("console")\
        .start()
    query.awaitTermination()

Тот же код выполняется нормально, но пропускает различные результаты. Итак, учитывая следующий входной поток:

Emitting event # 0 {'type': 'CALL_START', 'id': '24ed249b-1316-4d58-91b9-e298d3064ee4', 'user_id': '6487301e-1742-4a37-9c14-d88b7dd84408', 'emit_timestamp': 1578921588.269, 'num': 0}
EMITTING #Calls:  1
Emitting event # 1 {'type': 'CALL_END', 'id': '24ed249b-1316-4d58-91b9-e298d3064ee4', 'user_id': '6487301e-1742-4a37-9c14-d88b7dd84408', 'emit_timestamp': 1578921595.281, 'num': 1}
Emitting event # 2 {'type': 'CALL_START', 'id': '46df9a92-744a-40dc-b26c-cf6e9a670c6e', 'user_id': '8750ba66-7374-4165-84e7-f5a1b7661682', 'emit_timestamp': 1578921604.669, 'num': 2}
EMITTING #Calls:  2
Emitting event # 3 {'type': 'CALL_END', 'id': '46df9a92-744a-40dc-b26c-cf6e9a670c6e', 'user_id': '8750ba66-7374-4165-84e7-f5a1b7661682', 'emit_timestamp': 1578921605.731, 'num': 3}
Emitting event # 5 {'type': 'CALL_START', 'id': '3d17c088-5a88-433f-8efd-2f4c00a5a0b5', 'user_id': 'fa0c94b7-47cf-4803-8c62-81301c495257', 'emit_timestamp': 1578921622.989, 'num': 5}
EMITTING #Calls:  3
Emitting event # 6 {'type': 'CALL_END', 'id': '3d17c088-5a88-433f-8efd-2f4c00a5a0b5', 'user_id': 'fa0c94b7-47cf-4803-8c62-81301c495257', 'emit_timestamp': 1578921629.079, 'num': 6}
Emitting event # 7 {'type': 'CALL_START', 'id': 'e095a663-244f-4adc-8c30-91093ed243d0', 'user_id': '3e3dd96b-9883-4a4f-8c20-7452038611cf', 'emit_timestamp': 1578921632.044, 'num': 7}
EMITTING #Calls:  4
Emitting event # 8 {'type': 'CALL_END', 'id': 'e095a663-244f-4adc-8c30-91093ed243d0', 'user_id': '3e3dd96b-9883-4a4f-8c20-7452038611cf', 'emit_timestamp': 1578921640.505, 'num': 8}
Emitting event # 11 {'type': 'CALL_START', 'id': 'fbce2bf5-85a4-460e-aeb1-83fc55169185', 'user_id': 'ec190ef2-b9ba-4d4e-902f-fad547e8a0d6', 'emit_timestamp': 1578921660.984, 'num': 11}
Emitting event # 12 {'type': 'CALL_START', 'id': '8fc7e98a-dc36-4e8b-83a9-c239b92ccee1', 'user_id': '1d6c3f49-47e0-40d7-b0a2-872c4930deba', 'emit_timestamp': 1578921664.926, 'num': 12}
Emitting event # 13 {'type': 'CALL_START', 'id': '3ef2d93b-95dd-4ca7-a9b4-e692251e68dc', 'user_id': 'f63be859-25ea-422f-852f-88d5b99c62df', 'emit_timestamp': 1578921666.166, 'num': 13}

Выходные данные:

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+--------------------+--------------------+--------------------+--------+----------+
|                  id|             user_id|             left_ts|            right_ts|  l_type|    r_type|
+--------------------+--------------------+--------------------+--------------------+--------+----------+
|3d17c088-5a88-433...|fa0c94b7-47cf-480...|2020-01-13 18:50:...|2020-01-13 18:50:...|CALL_END|CALL_START|
+--------------------+--------------------+--------------------+--------------------+--------+----------+

-------------------------------------------
Batch: 3
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+

-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+

-------------------------------------------
Batch: 5
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+

В последнем добавленном кадре данных отсутствуют явные записи: вход отправил 4 завершенных вызова, в то время как наш Сценарий получает только один из этих вызовов.

...