Python: многопроцессорный код очень медленный - PullRequest
1 голос
/ 16 апреля 2019

Я извлекаю .8 миллион записей за один раз (это однократный процесс) из mongodb с использованием pymongo и выполняю над ним какую-то операцию.

Мой код выглядит так, как показано ниже.

    proc = []
    for rec in cursor: # cursor has .8 million rows 
            print cnt
            cnt = cnt + 1
            url =  rec['urlk']
            mkptid = rec['mkptid']
            cii = rec['cii']

            #self.process_single_layer(url, mkptid, cii)


            proc = Process(target=self.process_single_layer, args=(url, mkptid, cii))
            procs.append(proc)
            proc.start()

             # complete the processes
    for proc in procs:
        proc.join()

process_single_layer - это функция, которая в основном загружает urls .из облака и хранит локально.

Теперь проблема в том, что процесс загрузки идет медленно, так как он должен попасть в URL.А так как записи огромны для обработки 1 тыс. Строк, это занимает 6 минут.

Чтобы сократить время, которое я хотел реализовать Multiprocessing.Но трудно увидеть разницу с приведенным выше кодом.

Пожалуйста, предложите мне, как я могу улучшить производительность в этом сценарии.

Ответы [ 2 ]

0 голосов
/ 16 апреля 2019

В этом сценарии проще использовать пул.

Очереди не нужны, так как вам не нужно взаимодействовать между вашими порожденными процессами. Мы можем использовать Pool.map для распределения рабочей нагрузки.

Pool.imap или Pool.imap_unordered могут быть быстрее с большим размером чанка. (Ссылка: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap) Вы можете использовать Pool.starmap, если хотите, и избавиться от распаковки кортежей.

from multiprocessing import Pool

def process_single_layer(data):
    # unpack the tuple and do the processing
    url, mkptid, cii = data
    return "downloaded" + url

def get_urls():
    # replace this code: iterate over cursor and yield necessary data as a tuple
    for rec in range(8): 
            url =  "url:" + str(rec)
            mkptid = "mkptid:" + str(rec)
            cii = "cii:" + str(rec)
            yield (url, mkptid, cii)

#  you can come up with suitable process count based on the number of CPUs.
with Pool(processes=4) as pool:
    print(pool.map(process_single_layer, get_urls()))
0 голосов
/ 16 апреля 2019

Прежде всего вам нужно сосчитать все строки в вашем файле, а затем создать фиксированное количество процессов (в идеале совпадающее с количеством ядер вашего процессора), которым вы передаете через очереди (по одной для каждого процесса) количество строк, равных делению total_number_of_rows / number_of_cores. Идея этого подхода заключается в том, что вы разделяете обработку этих строк между несколькими процессами, что обеспечивает параллелизм.

Чтобы узнать количество ядер динамически, выполните:

import multiprocessing as mp
cores_count = mp.cpu_count()

Небольшое улучшение, которое может быть достигнуто за счет исключения начального числа строк, заключается в циклическом добавлении строки путем создания списка очередей и последующем применении к нему итератора цикла.

Полный пример:

import queue
import multiprocessing as mp
import itertools as itools

cores_count = mp.cpu_count()


def dosomething(q):

    while True:

        try:
            row = q.get(timeout=5)
        except queue.Empty:
            break

    # ..do some processing here with the row

    pass

if __name__ == '__main__':
    processes
    queues = []

    # spawn the processes
    for i in range(cores_count):
        q = mp.Queue()
        queues.append(q)
        proc = Process(target=dosomething, args=(q,))
        processes.append(proc)

    queues_cycle = itools.cycle(queues)
    for row in cursor:
        q = next(queues_cycle)
        q.put(row)

    # do the join after spawning all the processes
    for p in processes:
        p.join()
...