Многопроцессорный пул потоков возвращает нулевой результат - PullRequest
0 голосов
/ 09 ноября 2018

Список Tup является подмножеством огромного набора данных. Я пытался использовать многопоточность, чтобы сократить время вычислений. Но список dfsi дает нулевой результат?

dfsi = list[]

tup = [(28075,69),(28075,72),(28075,73),(28075,76),(28075,96),(28075,99), 
(28075,102),(28075,103),(28075,162),(28075,165)]

from multiprocessing.pool import ThreadPool 

def multi_processing_tuples(sku,ids):
    Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
    Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
    dfsi.append((sku,ids,Q0,Q4))

pool_size = 5
pool = ThreadPool(pool_size)

for (sku,ids) in tup:
    pool.apply_async(multi_processing_tuples, ((sku,ids),))

pool.close()
pool.join()

РЕДАКТИРОВАТЬ:

dfsi = list[]

tup = [(28075,69),(28075,72),(28075,73),(28075,76),(28075,96),(28075,99), 
(28075,102),(28075,103),(28075,162),(28075,165)]

from multiprocessing.pool import ThreadPool 

def multi_processing_tuples(sku,ids):
    Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
    Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
    return(sku,ids,Q0,Q4)

pool_size = 5
pool = ThreadPool(pool_size)

for (sku,ids) in tup:
    dfsi.append(pool.apply_async(multi_processing_tuples, ((sku,ids),)))

pool.close()
pool.join()

Я получаю вывод dfsi как.

[<multiprocessing.pool.ApplyResult at 0x1f707d7d9b0>,
<multiprocessing.pool.ApplyResult at 0x1f707d7d748>,
<multiprocessing.pool.ApplyResult at 0x1f707d7d710>,
<multiprocessing.pool.ApplyResult at 0x1f707d7dda0>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e0f0>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e358>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e320>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e6a0>,
<multiprocessing.pool.ApplyResult at 0x1f707d936d8>,
<multiprocessing.pool.ApplyResult at 0x1f707d93eb8>]

как я могу увидеть реальный результат?

1 Ответ

0 голосов
/ 09 ноября 2018

Когда вы создаете новый поток, вы делитесь данными из вашего исходного процесса, но когда вы пытаетесь изменить эти данные, они копируются. Когда вы закрываете эту ветку, вы НЕ копируете что-либо обратно. Вам необходимо явно вернуть свои результаты и затем обработать их в родительском.

def multi_processing_tuples(skid):
    sku,ids = skid
    Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
    Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
    return (sku,ids,Q0,Q4)

for data in pool.imap(multi_processing_tuples,tup):
    dfsi.append(data)

Выполнение этого вернет данные из multi_processing_tuples, но вы, вероятно, также должны передать df в качестве аргумента.

РЕДАКТИРОВАТЬ: Кроме того, как правило, вы не должны использовать темы для этого; если вы пытаетесь улучшить время выполнения процесса, интенсивно использующего процессор, вам следует использовать пул процессов. Многопоточность поможет в процессах с интенсивным вводом-выводом.

...