Я использую модуль multiprocessing
в Python 3.7
. Мой код не работает должным образом (см. Этот вопрос здесь ). Кто-то предложил установить maxtasksperchild
, который я установил на 1. Затем, читая документацию, я решил, что лучше всего также установить chunksize
на 1. Это соответствующая часть кода:
# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtaskperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()
. Установка только maxtasksperchild
или только chunksize
позволила выполнить работу в ожидаемое время (для меньшего набора данных, который я использую для проверки кода). Установка обоих просто не закончилась бы sh, и через несколько секунд на самом деле ничего не заработало (я проверил с помощью htop
, чтобы увидеть, работают ли ядра).
Вопросы
Есть ли конфликт между maxtasksperchild
и chunksize
при их объединении?
Они делают одно и то же? maxtasksperchild
на уровне Pool()
и chunksize
на уровне Pool
методов?
============== ===============================================
РЕДАКТИРОВАТЬ
Я понимаю, что отладка может быть невозможна из представленного фрагмента кода, пожалуйста, найдите полный код ниже. Модули graph
и graphfile
- это просто маленькие написанные мной библиотеки, доступные в GitHub . Если вы используете sh для запуска кода, вы можете использовать любой из файлов в каталоге data/
в указанном репозитории GitHub. Короткие тесты лучше запускать, используя F2, но F1 и F3 - те, которые вызывают проблемы в HP C.
import graphfile
import graph
from multiprocessing.pool import Pool
import datetime
import logging
def remove_i_and_f(edges):
new_edges = dict()
for k,v in edges.items():
if 'i' in k:
continue
elif 'f' in k:
key = (k[0],k[0])
new_edges[key] = v
else:
new_edges[k] = v
return new_edges
if __name__ == "__main__":
import sys
# Read data
# =========
graph_to_study = sys.argv[1]
full_path = "/ComplexNetworkEntropy/"
file = graphfile.GraphFile(full_path + "data/" + graph_to_study + ".txt")
edges = file.read_edges_from_file()
# logging
# =======
d = datetime.date.today().strftime("%Y_%m_%d")
log_filename = full_path + "results/" + d + "_probabilities_log_" + graph_to_study + ".log"
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s === %(message)s')
logging.info("Graph to study: %s", graph_to_study)
logging.info("Date: %s", d)
# Process data
# ==============
edges = remove_i_and_f(edges)
g = graph.Graph(edges)
# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtasksperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()