Spark Структурированные потоковые каскадные самоприсоединения (Pyspark) - PullRequest
0 голосов
/ 10 апреля 2020

Проблема : При попытке выполнить каскадное самосоединение в потоковом DF pyspark получается пустой результат. Простое самостоятельное соединение работает нормально, но не каскадно. Перед объединением нет операций агрегирования.

Пример : я работаю с потоковым DF, содержащим три столбца: user_id, action_id и timestamp. Я хочу определить последовательность действий, выполняемых пользователем. Например, когда наблюдается последовательность action_id, такая как a1, a2 и a3, выполняемая тем же user_id.

Код

Ввод: источник Kafka с полем значения в форме:

{"action_id":"a1","timestamp":1583301386000,"user_id":"u0"}
{"action_id":"a2","timestamp":1583301387000,"user_id":"u0"}
{"action_id":"a3","timestamp":1583301388000,"user_id":"u0"}

Функция, которая выбирает спецификацию c action_id и добавляет суффикс к user_id и timestamp:

def select_action_id(events_df, action_id, idx):

    return events_df.select(\
        col("user_id").alias("user_id"+"_"+str(idx)),\
        col("timestamp").alias("timestamp"+"_"+str(idx)),\
        .where(col("action_id") == action_id)

Каскадное самосоединение для определения последовательности action_id:

def get_sequence(events_df, action_ids):

    joined_df = None

    for action_id in action_ids:
        action_df = select_action_id(events_df, action_id, 0)

        if joined_df is not None:
            joined_df = joined_df.join(action_df,
            expr("""
                user_id == user_id_0 AND
                timestamp_0 >= final_timestamp AND
                timestamp_0 <= initial_timestamp + interval 24 hours
                """))

            joined_df = joined_df\
                .drop("user_id_0", "final_timestamp")\
                .withColumnRenamed("timestamp_0", "final_timestamp")

        else: 
            joined_df = action_df
            joined_df = joined_df\
                .withColumnRenamed("timestamp_0", "final_timestamp")\
                .withColumnRenamed("user_id_0", "user_id")\
                .withColumn("initial_timestamp", col("final_timestamp"))

    return joined_df

Ожидаемый результат

+-------+-------------------+-------------------+
|user_id|initial_timestamp  |final_timestamp    |
+-------+-------------------+-------------------+
|u0     |2020-03-04 06:56:26|2020-03-04 06:56:28|
+-------+-------------------+-------------------+

Полученный результат

+-------+-----------------+---------------+
|user_id|initial_timestamp|final_timestamp|
+-------+-----------------+---------------+
+-------+-----------------+---------------+

Дополнительная информация : предлагаемое решение отлично работает, когда actions_ids = ["a1", "a2"] (т.е. задействовано только одно соединение), но не когда actions_ids = ["a1", "a2", "a3"].

метки времени преобразуются из unix в UT C.

Это мой первый вопрос переполнения стека :) Я прошу прощения, если описание проблемы недостаточно ясное или я нарушил какие-либо соглашения

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...