Проблема : При попытке выполнить каскадное самосоединение в потоковом DF pyspark получается пустой результат. Простое самостоятельное соединение работает нормально, но не каскадно. Перед объединением нет операций агрегирования.
Пример : я работаю с потоковым DF, содержащим три столбца: user_id
, action_id
и timestamp
. Я хочу определить последовательность действий, выполняемых пользователем. Например, когда наблюдается последовательность action_id
, такая как a1
, a2
и a3
, выполняемая тем же user_id
.
Код
Ввод: источник Kafka с полем значения в форме:
{"action_id":"a1","timestamp":1583301386000,"user_id":"u0"}
{"action_id":"a2","timestamp":1583301387000,"user_id":"u0"}
{"action_id":"a3","timestamp":1583301388000,"user_id":"u0"}
Функция, которая выбирает спецификацию c action_id
и добавляет суффикс к user_id
и timestamp
:
def select_action_id(events_df, action_id, idx):
return events_df.select(\
col("user_id").alias("user_id"+"_"+str(idx)),\
col("timestamp").alias("timestamp"+"_"+str(idx)),\
.where(col("action_id") == action_id)
Каскадное самосоединение для определения последовательности action_id
:
def get_sequence(events_df, action_ids):
joined_df = None
for action_id in action_ids:
action_df = select_action_id(events_df, action_id, 0)
if joined_df is not None:
joined_df = joined_df.join(action_df,
expr("""
user_id == user_id_0 AND
timestamp_0 >= final_timestamp AND
timestamp_0 <= initial_timestamp + interval 24 hours
"""))
joined_df = joined_df\
.drop("user_id_0", "final_timestamp")\
.withColumnRenamed("timestamp_0", "final_timestamp")
else:
joined_df = action_df
joined_df = joined_df\
.withColumnRenamed("timestamp_0", "final_timestamp")\
.withColumnRenamed("user_id_0", "user_id")\
.withColumn("initial_timestamp", col("final_timestamp"))
return joined_df
Ожидаемый результат
+-------+-------------------+-------------------+
|user_id|initial_timestamp |final_timestamp |
+-------+-------------------+-------------------+
|u0 |2020-03-04 06:56:26|2020-03-04 06:56:28|
+-------+-------------------+-------------------+
Полученный результат
+-------+-----------------+---------------+
|user_id|initial_timestamp|final_timestamp|
+-------+-----------------+---------------+
+-------+-----------------+---------------+
Дополнительная информация : предлагаемое решение отлично работает, когда actions_ids = ["a1", "a2"]
(т.е. задействовано только одно соединение), но не когда actions_ids = ["a1", "a2", "a3"]
.
метки времени преобразуются из unix в UT C.
Это мой первый вопрос переполнения стека :) Я прошу прощения, если описание проблемы недостаточно ясное или я нарушил какие-либо соглашения