pandas: применять фильтры с учетом отметки времени - PullRequest
1 голос
/ 02 апреля 2020

У меня есть следующие тестовые данные:

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06', '2014-01-07'],
     'id': [1, 2, 2, 3, 4, 4, 5], 'name': ['Darren', 'Sabrina', 'Steve', 'Sean', 'Ray', 'Stef', 'Dany']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

Вопрос заключается в следующем: Возвращаясь ко времени x дней (если смотреть из каждой записи), существует ли более y разных имен, которые имеют один и тот же идентификатор?

Вот код, который я написал. В моем примере я go возвращаюсь x = 2 дня и проверяю как минимум два разных имени (y = 1), имеющих один и тот же идентификатор. Если существуют хотя бы два разных имени, я сохраняю 1 в списке «result_store», иначе 0. Конечно, в этом примере возврат назад x дней невозможен, если i меньше x, но эта небольшая неточность не является проблемой для me.

def rule(data, x=2, y=1):
    result_store = []
    for i in range(data.shape[0]):
        id = data['id'][i]
        end_time = data['date'][i]
        start_time = end_time-datetime.timedelta(days=x)
        time_frame = data[(data['date'] >= start_time) & (data['date'] <= end_time)]
        time_frame = time_frame.loc[time_frame['id'] == id]
        distinct_names = time_frame['name'].nunique()
        if distinct_names > y:
            result_store.append(1)
        else:
            result_store.append(0)

    return result_store

Результат -

[0, 0, 1, 0, 0, 1, 0]

На самом деле у меня тысячи строк, и мое решение очень медленное. Я также попытался распараллелить индекс i, используя parmap, но ускорение также не является удовлетворительным. Есть ли более эффективный способ сделать это? Может быть, с помощью pyspark?

Спасибо!

1 Ответ

1 голос
/ 02 апреля 2020

Это будет работать для spark2.4 (array_distinct только в 2.4 ). Я использовал DataFrame , который вы указали, и Sparkrered для даты столбца типа TimestampType. Чтобы мой искровой код работал, дата столбца должна иметь тип TimestampType. Функция window отправляется обратно на 2 дня на основе с тем же идентификатором и собирает список имен. Если число различных имен > 1, то вводится 1, в противном случае 0.

В приведенном ниже коде используется rangeBetween(-(86400*2),Window.currentRow), что в основном означает, что для включения currentRow а затем go назад 2 дня , поэтому, если текущая дата строки равна 3, она будет включать [3,2,1]. если вам нужна только текущая дата строки и 1 день раньше, вы можете заменить 86400*2 на 86400*1

#If you can't use spark2.4 or get stuck, please leave a comment. 



from pyspark.sql import functions as F
from pyspark.sql.window import Window

df=spark.createDataFrame(data)

w=Window().partitionBy("id").orderBy((F.col("date")).cast("long")).rangeBetween(-(86400*2),Window.currentRow)
df.withColumn("no_distinct", F.size(F.array_distinct(F.collect_list("name").over(w))))\
  .withColumn("no_distinct", F.when(F.col("no_distinct")>1, F.lit(1)).otherwise(F.lit(0)))\
  .orderBy(F.col("date")).show()

+-------------------+---+-------+-----------+
|               date| id|   name|no_distinct|
+-------------------+---+-------+-----------+
|2014-01-01 00:00:00|  1| Darren|          0|
|2014-01-02 00:00:00|  2|Sabrina|          0|
|2014-01-03 00:00:00|  2|  Steve|          1|
|2014-01-04 00:00:00|  3|   Sean|          0|
|2014-01-05 00:00:00|  4|    Ray|          0|
|2014-01-06 00:00:00|  4|   Stef|          1|
|2014-01-07 00:00:00|  5|   Dany|          0|
+-------------------+---+-------+-----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...