Объединение в группу для заполнения временных рядов - PullRequest
0 голосов
/ 04 июля 2018

Я пытаюсь объединить два кадра данных на группу, чтобы заполнить время для каждого пользователя. Рассмотрим следующие фреймы данных pyspark,

df = sqlContext.createDataFrame(
    [
        ('2018-03-01 00:00:00', 'A', 5),
        ('2018-03-01 03:00:00', 'A', 7),
        ('2018-03-01 02:00:00', 'B', 3),
        ('2018-03-01 04:00:00', 'B', 2)
     ],
     ('datetime', 'username', 'count')
)

#and

df1 = sqlContext.createDataFrame(
    [
        ('2018-03-01 00:00:00',1),
        ('2018-03-01 01:00:00', 2),
        ('2018-03-01 02:00:00', 2),
        ('2018-03-01 03:00:00', 3),
        ('2018-03-01 04:00:00', 1),
        ('2018-03-01 05:00:00', 5)
    ],
    ('datetime', 'val')
)

которые производят,

+-------------------+--------+-----+
|           datetime|username|count|
+-------------------+--------+-----+
|2018-03-01 00:00:00|       A|    5|
|2018-03-01 03:00:00|       A|    7|
|2018-03-01 02:00:00|       B|    3|
|2018-03-01 04:00:00|       B|    2|
+-------------------+--------+-----+

#and 

+-------------------+---+
|           datetime|val|
+-------------------+---+
|2018-03-01 00:00:00|  1|
|2018-03-01 01:00:00|  2|
|2018-03-01 02:00:00|  2|
|2018-03-01 03:00:00|  3|
|2018-03-01 04:00:00|  1|
|2018-03-01 05:00:00|  5|
+-------------------+---+

Столбец val из df1 не имеет значения и не нужен в конечном результате, поэтому мы можем его отбросить. В конце ожидаемый результат будет,

+-------------------+--------+-----+
|           datetime|username|count|
+-------------------+--------+-----+
|2018-03-01 00:00:00|       A|    5|
|2018-03-01 01:00:00|       A|    0|
|2018-03-01 02:00:00|       A|    0|
|2018-03-01 03:00:00|       A|    7|
|2018-03-01 04:00:00|       A|    0|
|2018-03-01 05:00:00|       A|    0|
|2018-03-01 00:00:00|       B|    0|
|2018-03-01 01:00:00|       B|    0|
|2018-03-01 02:00:00|       B|    3|
|2018-03-01 03:00:00|       B|    0|
|2018-03-01 04:00:00|       B|    2|
|2018-03-01 05:00:00|       B|    0|
+-------------------+--------+-----+

Я пытался groupBy() и join, но это не сработало. Я также пытался создать функцию и зарегистрировать ее как pandas_udf(), но все равно не сработал, т.е.

df.groupBy('usernames').join(df1, 'datetime', 'right')

и

@pandas_udf('datetime string, username string, count double', F.PandasUDFType.GROUPED_MAP)
def fill_time(df):
    return df.merge(df1, on = 'cdatetime', how = 'right')

Есть предложения?

1 Ответ

0 голосов
/ 04 июля 2018

Просто сопоставьте различные временные метки и имена пользователей и внешнее объединение с данными:

from pyspark.sql.functions import broadcast

(broadcast(df1.select("datetime").distinct())
    .crossJoin(df.select("username").distinct())
    .join(df, ["datetime", "username"], "leftouter")
    .na.fill(0))

Для использования pandas_udf вам понадобится локальный объект в качестве ссылки

from pyspark.sql.functions import PandasUDFType, pandas_udf

def fill_time(df1):
    @pandas_udf('datetime string, username string, count double', PandasUDFType.GROUPED_MAP)
    def _(df):
        df_ = df.merge(df1, on='datetime', how='right')
        df_["username"] = df_["username"].ffill().bfill()
        return df_
    return _

(df.groupBy("username")
    .apply(fill_time(
        df1.select("datetime").distinct().toPandas()
    ))
    .na.fill(0))

но это будет медленнее, чем решение только для SQL.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...