многопроцессорная обработка: конфликты maxtasksperchild и chunksize? - PullRequest
0 голосов
/ 12 февраля 2020

Я использую модуль multiprocessing в Python 3.7. Мой код не работает должным образом (см. Этот вопрос здесь ). Кто-то предложил установить maxtasksperchild, который я установил на 1. Затем, читая документацию, я решил, что лучше всего также установить chunksize на 1. Это соответствующая часть кода:

# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtaskperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()

. Установка только maxtasksperchild или только chunksize позволила выполнить работу в ожидаемое время (для меньшего набора данных, который я использую для проверки кода). Установка обоих просто не закончилась бы sh, и через несколько секунд на самом деле ничего не заработало (я проверил с помощью htop, чтобы увидеть, работают ли ядра).

Вопросы

  1. Есть ли конфликт между maxtasksperchild и chunksize при их объединении?

  2. Они делают одно и то же? maxtasksperchild на уровне Pool() и chunksize на уровне Pool методов?

============== ===============================================

РЕДАКТИРОВАТЬ

Я понимаю, что отладка может быть невозможна из представленного фрагмента кода, пожалуйста, найдите полный код ниже. Модули graph и graphfile - это просто маленькие написанные мной библиотеки, доступные в GitHub . Если вы используете sh для запуска кода, вы можете использовать любой из файлов в каталоге data/ в указанном репозитории GitHub. Короткие тесты лучше запускать, используя F2, но F1 и F3 - те, которые вызывают проблемы в HP C.

import graphfile
import graph
from multiprocessing.pool import Pool
import datetime
import logging


def remove_i_and_f(edges):
    new_edges = dict()
    for k,v in edges.items():
        if 'i' in k:
            continue
        elif 'f' in k:
            key = (k[0],k[0])
            new_edges[key] = v
        else:
            new_edges[k] = v
    return new_edges



if __name__ == "__main__":
    import sys

    # Read data
    # =========
    graph_to_study = sys.argv[1]
    full_path = "/ComplexNetworkEntropy/"
    file = graphfile.GraphFile(full_path + "data/" + graph_to_study + ".txt")
    edges = file.read_edges_from_file()

    # logging
    # =======
    d = datetime.date.today().strftime("%Y_%m_%d")
    log_filename = full_path + "results/" + d + "_probabilities_log_" + graph_to_study + ".log"
    logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s === %(message)s')
    logging.info("Graph to study: %s", graph_to_study)
    logging.info("Date: %s", d)

    # Process data
    # ==============
    edges = remove_i_and_f(edges)
    g = graph.Graph(edges)

    # Parallel Entropy Calculation
    # ============================
    node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
    pool = Pool(maxtasksperchild=1)
    start = datetime.datetime.now()
    logging.info("Start time: %s", start)
    print("Start time: ", start)
    results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
    end = datetime.datetime.now()
    print("End time: ", end)
    print("Run time: ", end - start)
    logging.info("End time: %s", end)
    logging.info("Total run time: %s", start)
    pool.close()
    pool.join()

1 Ответ

0 голосов
/ 12 февраля 2020

maxtasksperchild обеспечивает перезапуск работника после выполнения определенного количества задач. Другими словами, он убивает процесс после выполнения maxtaskperchild итерации заданной вами функции. Он предназначен для сдерживания утечек ресурсов, вызванных плохими реализациями в долго работающих службах.

chunksize группирует заданную коллекцию / итератор в несколько задач. Затем он отправляет по внутреннему каналу всю группу, чтобы уменьшить издержки межпроцессного взаимодействия (IP C). Элементы коллекции по-прежнему будут обрабатываться 1 на 1. chunksize полезно, если у вас большая коллекция мелких элементов, а накладные расходы IP C значительны по отношению к обработке самих элементов. Одним из побочных эффектов является то, что один и тот же процесс будет обрабатывать весь кусок.

Установка обоих параметров в 1 значительно увеличивает скорость процесса и IP C, которые являются достаточно ресурсоемкими, особенно на машинах с большим количеством ядер.

...