Python многопроцессорный map_async зависает - PullRequest
2 голосов
/ 18 марта 2012

У меня есть некоторые проблемы [возможно] с закрытием пула процессов в моем парсере.Когда все задачи выполнены, он зависает и ничего не делает, загрузка процессора составляет около 1%.

profiles_pool = multiprocessing.Pool(processes=4)
pages_pool = multiprocessing.Pool(processes=4)

m = multiprocessing.Manager()
pages = m.list(['URL'])
pages_done = m.list()

while True:
    # grab all links
    res = pages_pool.imap_unordered(deco_process_people, pages, chunksize=1)

    pages_done += pages
    pages = []

    for new_users,new_pages in res:
        users.update(new_users)
        profile_tasks = [ (new_users[i]['link'],i) for i in new_users ]
        # enqueue grabbed links for parsing
        profiles_pool.map_async(deco_process_profiles, 
                                profile_tasks, chunksize=2, 
                                callback=profile_update_callback)
        # i dont need a result of map_async actually
        # callback will apply parsed data to users dict
        # users dict is an instance of Manager.dict()

        for p in new_pages:
            if p not in pages_done and p not in pages:
                pages.append(p)

    # we need more than 900 pages to be parsed for bug occurrence
    #if len(pages) == 0:
    if len(pages_done) > 900:
        break


# 
# closing other pools
#

# ---- the last printed string: 
print 'Closing profiles pool',
sys.stdout.flush()
profiles_pool.close()
profiles_pool.join()
print 'closed'

Полагаю, проблема в неправильном расчете открытых задач в очереди пула, но я не уверен и не могу это проверить - узнать, как получить длину очереди задач.

Что можеттак, а где искать в первую очередь?

Ответы [ 2 ]

1 голос
/ 19 марта 2012

Я выяснил причину ошибки: «метод объединения многопроцессорных объектов пула зависает, если итеративный аргумент pool.map пуст»

http://bugs.python.org/issue12157

1 голос
/ 18 марта 2012

Наиболее очевидная проблема заключается в том, что pages_done является синхронизированным объектом Manager.list (так что каждый процесс может получить к нему атомарный доступ), но хотя pages запускается как один из них, он быстро становится обычным) обработанный список:

pages_done += pages
pages = []

Вторая строка в кавычках связывает pages с новым, пустым обычным списком.

Даже если вы удалили все элементы pages во второй строке(вместо того, чтобы выполнять повторное назначение), вы можете столкнуться с гонкой, в которой (например) pages содержали A, B и C, когда вы выполнили += в первой строке, но стали A, B,C и D. на секунду.

Быстрым решением было бы снимать предметы с pages по одному и помещать их в pages_done по одному (не очень эффективно).Может быть, лучше, чтобы эти структуры данных вообще не были общими;в кавычном коде это не выглядит так, как должно быть (я предполагаю, что от него зависит какой-то не заключенный в кавычки код, поскольку в противном случае повторное связывание pages в любом случае является красной сельдью!).

...