Оптимизация pandas кадра данных применяется для декодирования кадров CAN - итерации по строкам - PullRequest
3 голосов
/ 17 января 2020

Я пытаюсь сократить время, необходимое для применения сложной функции из библиотеки cantools к каждой строке в кадре данных (до 2 миллионов строк): Timestamp Type ID Data 0 16T122109957 0 522 b'0006' 1 16T122109960 0 281 b'0000ce52d2290000' 2 16T122109960 0 279 b'0000000000000000' 3 16T122109960 0 304 b'0000' 4 16T122109961 0 277 b'400000'

Использование приведенного выше кадра данных и дБ c файл считан. Файл DB c представляет собой набор правил кодирования / декодирования данных.

Использование DataFrame может занять до 10 минут:

df['decoded'] = df.apply(lambda x: dbc.decode_message(df['ID'][x], df['Data']))

Помещение двух столбцов в списки и последующая итерация по спискам занимает всего около минуты, но когда новый массив сохраняется в кадре данных, появляется ошибка ValueError: array is too big. Что ожидается, так как это ОГРОМНО.

Пример l oop код:

id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
    listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts)

Я попытался python векторизация, которая, по-видимому, самая быстрая и была встречена с ошибкой TypeError: 'Series' objects are mutable, thus they cannot be hashed который я не могу исправить. пример:

Data['dict'] = dbc.decode_message(df['ID'], df['Data'])

Есть ли другие способы ускорить процесс применения или мне следует попробовать работать над векторизацией?

МИНИМАЛЬНЫЙ пример:

import cantools
import pandas as pd

df = pd.read_csv('file.log', skiprows=11, sep=';')
dbc = cantools.database.load_file('file.dbc')

# option 1 SLOW
df['decoded'] = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']))

# option 2 Faster...
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
    listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts) #< -- causes error for being to big

#option 3
df['dict'] = dbc.decode_message(df['ID'], df['Data']) #< --Error

Ответы [ 2 ]

1 голос
/ 20 января 2020

Публикуем это как ответ, но YMMV:

Пока библиотека cantools не поддерживает работу с Series или DataFrame объектами, векторизация не будет работать. Таким образом, использование apply является единственным способом go.

Поскольку преобразование db c работает построчно без каких-либо межстрочных зависимостей, вы должны иметь возможность его распараллелить.

Вам нужно

  • Написать функцию, выполняющую преобразование, с использованием фрейма данных:

    def decode(df):
        df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
        return df
    
  • вызвать его так:

    import pandas as pd
    import numpy as np
    import multiprocessing as mp
    
    def parallelApply(df, func, numChunks=4):
        df_split = np.array_split(df, numChunks)
        pool = mp.Pool(numChunks)
        df = pd.concat(pool.map(func, df_split))
        pool.close()
        pool.join()
        return df
    
    df = parallelApply(df, decode)
    

Что делает parallelApply, это разбивает фрейм данных на numChunks куски и создает многопроцессорный пул с таким количеством записей.

Затем применяется функция func (которая равен decode в вашем случае) каждому из блоков в отдельном процессе.

decode возвращает обновленный блок данных, и pd.concat объединит их снова.


Существует также очень удобная библиотека pandarallel, которая сделает это за вас, но вы будете вынуждены использовать WSL при работе на Windows.:

pip install pandarallel

После вызова

from pandarallel import pandarallel
pandarallel.initialize()

вы просто конвертируете вызов из

df.apply(...)

в

df.parallel_apply(func)

L ibrary ускоряет несколько процессов и позволяет каждому процессу обрабатывать подмножество данных.

0 голосов
/ 24 января 2020

Адаптировано из ответа М. Спиллера - различия указаны в скобках:

(импорт). Они должны быть импортированы: из multiprocessing.dummy import freeze_support

import cantools
import pandas as pd
from itertools import repeat
import multiprocessing as mp

Написать функцию, выполнив преобразование, принимающее фрейм данных (и передаваемый для декодирования db c):

def decode(df, dbc):
    df2 = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
    df2 = pd.DataFrame
    return df2

, вызываем его так (передавая db c через функции):

def parallel_apply(df, func, dbc=None, numChunks=mp.cpu_count()):
    df_split = np.array_split(df, numChunks)
    pool = mp.Pool(numChunks)

    df2 = pd.concat(pool.starmap(func, zip(df_split, repeat(dbc))))
    pool.close()
    pool.join()
    return df2

Freeze_support()
#read in dbc
#read in df with encoded CAN messages
df2 = parallel_apply(df, decode, dbc)

Реализуйте функции чтения, где комментарии были размещены. Это решение будет использовать все ядра ЦП и разделить задачу на 4 части - параллельный процесс и в конце воссоединиться с фреймом данных.

...