Лучший способ обработать поток кликов для создания функций в Pandas - PullRequest
0 голосов
/ 04 июля 2018

Я обрабатываю фрейм данных с потоком кликов и извлекаю функции для каждого пользователя в потоке кликов, которые будут использоваться в проекте машинного обучения.

Фрейм данных выглядит примерно так:

data = pd.DataFrame({'id':['A01','B01','A01','C01','A01','B01','A01'],
                     'event':['search','search','buy','home','cancel','home','search'],
                     'date':['2018-01-01','2018-01-01','2018-01-02','2018-01-03','2018-01-04','2018-01-04','2018-01-06'],
                     'product':['tablet','dvd','tablet','tablet','tablet','book','book'],
                     'price': [103,2,203,103,203,21,21]})
data['date'] = pd.to_datetime(data['date'])

Так как я должен создавать функции для каждого пользователя, я использую групповую настройку / применение с пользовательской функцией, такой как:

featurized = data.groupby('id').apply(featurize)

Создание пользовательских функций займет часть информационного кадра и создаст много (сотни) функций. Весь процесс слишком медленный, поэтому я ищу рекомендацию, чтобы сделать это более эффективно.

Пример функции, используемой для создания объектов:

def featurize(group):
    features = dict()

    # Userid
    features['id'] = group['id'].max()
    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event']=='search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product']=='tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1,'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product']=='tablet','price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product']=='tablet','price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product']=='tablet','price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product']=='tablet','price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product']=='tablet','price'].std()
    return pd.Series(features)

Одна потенциальная проблема заключается в том, что каждая функция потенциально сканирует весь блок, поэтому, если у меня есть 100 функций, я сканирую блок 100 раз вместо одного.

Например, функция может быть числом «планшетных» событий, которые есть у пользователя, другая может быть числом «домашних» событий, другая может быть средней разницей во времени между «поисковыми» событиями, а затем средней разницей во времени между » поиск "событий" для "планшетов" и т. д. и т. д. Каждая функция может быть закодирована как функция, которая берет фрагмент (df) и создает функцию, но когда у нас есть сотни функций, каждая из них сканирует весь фрагмент, когда достаточно одного линейного сканирования , Проблема в том, что код станет уродливым, если я сделаю руководство для цикла над каждой записью в чанке и закодирую все функции в цикле.

Вопросы:

  1. Если мне придется обрабатывать фрейм данных сотни раз, есть ли способ абстрагировать это за одно сканирование, которое создаст все необходимые функции?

  2. Есть ли улучшение скорости по сравнению с подходом группового режима / применения, который я сейчас использую?

Ответы [ 2 ]

0 голосов
/ 13 июля 2018

DataFrame.agg () ваш друг здесь. Вы правы в том, что начальный реализованный метод повторяется по всему набору данных для КАЖДОГО вызова. Итак, что мы можем сделать, это определить всю тяжелую работу, которую мы хотим сделать вначале, и позволить пандам обрабатывать все внутренние оптимизации. Обычно с этими библиотеками ОЧЕНЬ редко бывают случаи, когда вы можете кодировать что-то, что будет лучше, используя только внутренние библиотеки.

Что хорошо в этом методе, так это то, что вам нужно выполнить этот сложный расчет только ОДИН РАЗ, а затем вы можете выполнить все тонко настроенные функции для создания отфильтрованного набора данных, поскольку он намного быстрее.

Это сокращает время выполнения на 65%, что довольно много. Кроме того, в следующий раз, когда вы захотите получить новую статистику, вы можете просто получить доступ к результату featurize2 и больше не запускать вычисления.

df = make_data()
# include this to be able to calculate standard deviations correctly
df['price_sq'] = df['price'] ** 2.

def featurize2(df):
    grouped = df.groupby(['id', 'product', 'event'])
    initial = grouped.agg({'price': ['count', 'max', 'min', 'mean', 'std', 'sum', 'size'], 'date': [
        'max', 'min'], 'price_sq': ['sum']}).reset_index()
    return initial

def featurize3(initial):

    # Features 5-8
    features = initial.groupby('product').sum()['price']['count'].agg(['max', 'min', 'mean', 'std']).rename({
        'max': 'max_product_events',
        'min': 'min_product_events',
        'mean': 'mean_product_events',
        'std': 'std_product_events'
    })

    searches = initial[initial['event'] == 'search']['price']

    # Feature 1: Number of search events
    features['number_of_search_events'] = searches['count'].sum()

    tablets = initial[initial['product'] == 'tablet']['price']
    tablets_sq = initial[initial['product'] == 'tablet']['price_sq']

    # Feature 2: Number of tablets
    features['number_of_tablets'] = tablets['count'].sum()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = tablets['sum'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = tablets['max'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = tablets['min'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = (
        tablets['mean'] * tablets['count']).sum() / tablets['count'].sum()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = np.sqrt(tablets_sq['sum'].sum(
    ) / tablets['count'].sum() - features['tablet_price_mean'] ** 2.)

    # Feature 3: Total time
    features['total_time'] = (initial['date']['max'].max(
    ) - initial['date']['min'].min()) / np.timedelta64(1, 'D')

    # Feature 4: Total number of events
    features['events'] = initial['price']['count'].sum()

    return features

def new_featurize(df):
    initial = featurize2(df)
    final = featurize3(initial)
    return final

original = featurize(df)
final = new_featurize(df)

for x in final.index:
    print("outputs for index {} are equal: {}".format(
        x, np.isclose(final[x], original[x])))

print("featurize(df): {}".format(timeit.timeit("featurize(df)",
                    "from __main__ import featurize, df", number=3)))
print("featurize2(df): {}".format(timeit.timeit("featurize2(df)",
                    "from __main__ import featurize2, df", number=3)))
print("new_featurize(df): {}".format(timeit.timeit("new_featurize(df)",
                    "from __main__ import new_featurize, df", number=3)))

for x in final.index:
    print("outputs for index {} are equal: {}".format(
        x, np.isclose(final[x], original[x])))

Результаты

featurize(df): 76.0546050072
featurize2(df): 26.5458261967
new_featurize(df): 26.4640090466
outputs for index max_product_events are equal: [ True]
outputs for index min_product_events are equal: [ True]
outputs for index mean_product_events are equal: [ True]
outputs for index std_product_events are equal: [ True]
outputs for index number_of_search_events are equal: [ True]
outputs for index number_of_tablets are equal: [ True]
outputs for index tablet_price_sum are equal: [ True]
outputs for index tablet_price_max are equal: [ True]
outputs for index tablet_price_min are equal: [ True]
outputs for index tablet_price_mean are equal: [ True]
outputs for index tablet_price_std are equal: [ True]
outputs for index total_time are equal: [ True]
outputs for index events are equal: [ True]
0 голосов
/ 09 июля 2018

Отказ от ответственности: следующий ответ не дает правильного ответа на вышеуказанный вопрос. Просто оставил это здесь ради вложенной работы. Может быть, в какой-то момент это пригодится.

  1. повторно использовать выборки данных (например, group.loc[group['product']=='tablet','price'])
  2. параллелизм (например, Распараллеливание применяется после групп панд ; см. Код ниже)
  3. использовать кэш, если вы выполняете вычисления несколько раз (например, HDFStore)
  4. избегать строковых операций; использовать нативные типы, которые могут быть эффективно представлены в numpy
  5. если вам действительно нужны строки, используйте категориальные столбцы (если они представляют категориальные данные ..)
  6. если кадры действительно большие, рассмотрите возможность использования кусков (например, рабочие потоки "больших данных" с использованием панд )
  7. используйте cython для дальнейших (потенциально радикальных) улучшений

Что касается (1), учитывая ваш код, указанный выше, я могу увеличить скорость до 43% (процессор i7-7700HQ, 16 ГБ ОЗУ).

Задержка

using joblib: 68.86841534099949s
using multiprocessing: 71.53540843299925s
single-threaded: 119.05010353899888s

Код

import pandas as pd
import numpy as np
import time
import timeit
import os
import joblib
import multiprocessing


import pandas as pd
import numpy as np
import timeit
import joblib
import multiprocessing


def make_data():
    # just some test data ...
    n_users = 100
    events = ['search', 'buy', 'home', 'cancel']
    products = ['tablet', 'dvd', 'book']
    max_price = 1000

    n_duplicates = 1000
    n_rows = 40000

    df = pd.DataFrame({
        'id': list(map(str, np.random.randint(0, n_users, n_rows))),
        'event': list(map(events.__getitem__, np.random.randint(0, len(events), n_rows))),
        'date': list(map(pd.to_datetime, np.random.randint(0, 100000, n_rows))),
        'product': list(map(products.__getitem__, np.random.randint(0, len(products), n_rows))),
        'price': np.random.random(n_rows) * max_price
    })
    df = pd.concat([df for _ in range(n_duplicates)])
    df.to_pickle('big_df.pkl')
    return df


def data():
    return pd.read_pickle('big_df.pkl')


def featurize(group):
    features = dict()

    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event'] == 'search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product'] == 'tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1, 'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product'] == 'tablet', 'price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product'] == 'tablet', 'price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product'] == 'tablet', 'price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product'] == 'tablet', 'price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product'] == 'tablet', 'price'].std()
    return pd.DataFrame.from_records(features, index=[group['id'].max()])


# https://stackoverflow.com/questions/26187759/parallelize-apply-after-pandas-groupby
def apply_parallel_job(dfGrouped, func):
    retLst = joblib.Parallel(n_jobs=multiprocessing.cpu_count())(
        joblib.delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)


def apply_parallel_pool(dfGrouped, func):
    with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
        ret_list = list(p.map(func, [group for name, group in dfGrouped]))
    return pd.concat(ret_list)


featurized_job = lambda df: apply_parallel_job(df.groupby('id'), featurize)
featurized_pol = lambda df: apply_parallel_pool(df.groupby('id'), featurize)
featurized_sng = lambda df: df.groupby('id').apply(featurize)

make_data()
print(timeit.timeit("featurized_job(data())", "from __main__ import featurized_job, data", number=3))
print(timeit.timeit("featurized_sng(data())", "from __main__ import featurized_sng, data", number=3))
print(timeit.timeit("featurized_pol(data())", "from __main__ import featurized_pol, data", number=3))

Что касается (7), рассмотрим следующую рефакторизацию:

Задержка

original: 112.0091859719978s
re-used prices: 83.85681765000118s

Код

# [...]
prices_ = group.loc[group['product'] == 'tablet', 'price']
features['tablet_price_sum'] = prices_.sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = prices_.max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = prices_.min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = prices_.mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = prices_.std()
# [...]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...