Я пытался протестировать агрегаты на структурированных потоках в Spark. Для проверки я использую следующие два события:
{'type': 'CALL_START', 'id': '0f048308-9b05-4fdf-ab08-f563f2e8c512', 'user_id': '62cebbdc-d491-4530-b665-702832e393db', 'emit_timestamp': 1578921173.922, 'num': 0}
и
{'type': 'CALL_END', 'id': '0f048308-9b05-4fdf-ab08-f563f2e8c512', 'user_id': '62cebbdc-d491-4530-b665-702832e393db', 'emit_timestamp': 1578921179.753, 'num': 1}
Я хочу иметь возможность окончательно рассчитать общее время вызова: (Отметка времени окончания вызова - отметка времени call_started ). Я использовал следующий код:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, StringType, LongType, TimestampType, DoubleType
from pyspark.sql.functions import from_unixtime, to_timestamp, udf, col, unix_timestamp, lag, pandas_udf, PandasUDFType, expr
from datetime import datetime
def format_timestamp(ts):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("redis-example") \
.master("local[*]") \
.config("spark.redis.host", "localhost") \
.config("spark.redis.port", "6379") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
clicks = spark.readStream\
.format("redis")\
.option("stream.keys", "clicks")\
.schema(
StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("num", LongType(), True),
StructField("user_id", StringType(), True),
StructField("emit_timestamp", DoubleType(), True),
])\
)\
.load()
format_timestamp_udf = udf(lambda x: format_timestamp(x))
added_timestamp = clicks.withColumn("timestamp", col("emit_timestamp").cast(TimestampType()))\
.withWatermark('timestamp', '2 hours')
strm = added_timestamp.alias('l').join(added_timestamp.alias('r'),
expr("""
r.timestamp < l.timestamp + interval 1 hour AND
l.id = r.id"""),
"leftOuter"
)\
.select(col('l.id').alias('id'),
col('l.user_id').alias('user_id'),
col('l.timestamp').alias('left_ts'),
col('r.timestamp').alias('right_ts'),
col('l.type').alias('l_type'),
col('r.type').alias('r_type'))
query = strm.writeStream \
.outputMode("append")\
.format("console")\
.start()
query.awaitTermination()
Тот же код выполняется нормально, но пропускает различные результаты. Итак, учитывая следующий входной поток:
Emitting event # 0 {'type': 'CALL_START', 'id': '24ed249b-1316-4d58-91b9-e298d3064ee4', 'user_id': '6487301e-1742-4a37-9c14-d88b7dd84408', 'emit_timestamp': 1578921588.269, 'num': 0}
EMITTING #Calls: 1
Emitting event # 1 {'type': 'CALL_END', 'id': '24ed249b-1316-4d58-91b9-e298d3064ee4', 'user_id': '6487301e-1742-4a37-9c14-d88b7dd84408', 'emit_timestamp': 1578921595.281, 'num': 1}
Emitting event # 2 {'type': 'CALL_START', 'id': '46df9a92-744a-40dc-b26c-cf6e9a670c6e', 'user_id': '8750ba66-7374-4165-84e7-f5a1b7661682', 'emit_timestamp': 1578921604.669, 'num': 2}
EMITTING #Calls: 2
Emitting event # 3 {'type': 'CALL_END', 'id': '46df9a92-744a-40dc-b26c-cf6e9a670c6e', 'user_id': '8750ba66-7374-4165-84e7-f5a1b7661682', 'emit_timestamp': 1578921605.731, 'num': 3}
Emitting event # 5 {'type': 'CALL_START', 'id': '3d17c088-5a88-433f-8efd-2f4c00a5a0b5', 'user_id': 'fa0c94b7-47cf-4803-8c62-81301c495257', 'emit_timestamp': 1578921622.989, 'num': 5}
EMITTING #Calls: 3
Emitting event # 6 {'type': 'CALL_END', 'id': '3d17c088-5a88-433f-8efd-2f4c00a5a0b5', 'user_id': 'fa0c94b7-47cf-4803-8c62-81301c495257', 'emit_timestamp': 1578921629.079, 'num': 6}
Emitting event # 7 {'type': 'CALL_START', 'id': 'e095a663-244f-4adc-8c30-91093ed243d0', 'user_id': '3e3dd96b-9883-4a4f-8c20-7452038611cf', 'emit_timestamp': 1578921632.044, 'num': 7}
EMITTING #Calls: 4
Emitting event # 8 {'type': 'CALL_END', 'id': 'e095a663-244f-4adc-8c30-91093ed243d0', 'user_id': '3e3dd96b-9883-4a4f-8c20-7452038611cf', 'emit_timestamp': 1578921640.505, 'num': 8}
Emitting event # 11 {'type': 'CALL_START', 'id': 'fbce2bf5-85a4-460e-aeb1-83fc55169185', 'user_id': 'ec190ef2-b9ba-4d4e-902f-fad547e8a0d6', 'emit_timestamp': 1578921660.984, 'num': 11}
Emitting event # 12 {'type': 'CALL_START', 'id': '8fc7e98a-dc36-4e8b-83a9-c239b92ccee1', 'user_id': '1d6c3f49-47e0-40d7-b0a2-872c4930deba', 'emit_timestamp': 1578921664.926, 'num': 12}
Emitting event # 13 {'type': 'CALL_START', 'id': '3ef2d93b-95dd-4ca7-a9b4-e692251e68dc', 'user_id': 'f63be859-25ea-422f-852f-88d5b99c62df', 'emit_timestamp': 1578921666.166, 'num': 13}
Выходные данные:
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+
-------------------------------------------
Batch: 1
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+--------------------+--------------------+--------------------+--------+----------+
| id| user_id| left_ts| right_ts| l_type| r_type|
+--------------------+--------------------+--------------------+--------------------+--------+----------+
|3d17c088-5a88-433...|fa0c94b7-47cf-480...|2020-01-13 18:50:...|2020-01-13 18:50:...|CALL_END|CALL_START|
+--------------------+--------------------+--------------------+--------------------+--------+----------+
-------------------------------------------
Batch: 3
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+
-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+
-------------------------------------------
Batch: 5
-------------------------------------------
+---+-------+-------+--------+------+------+
| id|user_id|left_ts|right_ts|l_type|r_type|
+---+-------+-------+--------+------+------+
+---+-------+-------+--------+------+------+
В последнем добавленном кадре данных отсутствуют явные записи: вход отправил 4 завершенных вызова, в то время как наш Сценарий получает только один из этих вызовов.