Работающий фрейм данных применяется параллельно с частичным - PullRequest
0 голосов
/ 22 января 2020

Я следую за ответом на этот вопрос: pandas применяется многопроцессорная обработка

Обычно, когда я запускаю функцию для строк в pandas, я делаю что-то вроде

dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))

...

def process(attr1, attr2, ...):
    ...

Но я хочу запустить эту многопоточную функцию. Поэтому я реализовал параллелизацию_on_rows из вышеприведенного вопроса. Однако вышеупомянутое решение работает, потому что переданная функция не принимает параметры. Для функций с параметрами я пытался использовать партиалы. Тем не менее, я не могу понять, как создать партиал, который принимает аргументы из строки, для которой требуется доступ к лямбда-функции.

Вот мой код

def parallelize_function_on_df(self, data, func, num_of_processes=5):
    # refers to the data being split across array sections
    data_split = np.array_split(data, num_of_processes)

    # map a specific function to the array sections
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))

    # must call close before join, research why
    pool.close()
    pool.join()
    return data

def run_on_df_subset(self, func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(self, data, func, num_of_processes=5):
    return self.parallelize_function_on_df(data, partial(self.run_on_df_subset, func), num_of_processes)



def mass_download(some_sql):
    download_table_df = pd.read_sql(some_sql, con=MYSQL.CONNECTION)
    processed_data = {}
    custom_option = True
    process_row_partial = partial(self.process_candidate_row_parallel, processed_data, custom_option)

    parallelize_on_rows(download_table_df, process_row_partial)


def process_candidate_row_parallel(row, processed_data, custom_option=False):
    if row['some_attr'] in processed_data.keys() and processed_data[row['some_attr']] == 'download_successful' and custom_option:
        do_some_other_processing()

    download_single_file(row['some_attr1'], row['some_attr2'], processed_data)



Так что это не ' Это не работает, потому что, как я уже сказал, строка [атрибуты] на самом деле не передается потоку, так как моя часть просто имеет функцию без аргументов. Как мне достичь иного?

...