Я пытаюсь помочь коллеге разогнать ноутбук jupiter с некоторой многопроцессорностью. У меня это плохо получается: - /
Что-то не так, программа не возвращается, и я не могу понять почему. Я должен остановить выполнение, а затем я вижу в стеке, что происходит ошибка в _wait_for_tstate_lock.
Согласно выводам моей программы, кажется, что процесс завершен ... Элемент «Нет» обнаружен в queue и while l oop должен закончиться.
Для информации, задание заключается в импорте файлов xlsx, их очистке с помощью panda и объединении их в единый фрейм данных. (1 файл для тестирования, около 80 МБ на диске)
Работал как шарм, когда не использовалась многопроцессорная обработка.
Выходы
Journal_Appels_2019_01.xlsx démarré par Process-86
Journal_Appels_2019_01.xlsx importé en 113.18582201004028 secondes
Fin du processus Process-86
Прерывание стека (клавиатура прерывание)
File "/anaconda/envs/azureml_py36/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join
thread.join()
File "/anaconda/envs/azureml_py36/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/anaconda/envs/azureml_py36/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Код
from multiprocessing import Lock, Process, Queue, current_process
import time
def do_job(tasks_to_accomplish, tasks_that_are_done):
while True:
task = tasks_to_accomplish.get_nowait()
if (task is None):
print('Fin du processus ' + current_process().name)
break
print(task + ' démarré par ' + current_process().name)
starttime = time.time()
tasks_that_are_done.put(import_journal(task))
print(task + ' importé en {} secondes'.format(time.time() - starttime))
# setting up
number_of_processes = multiprocessing.cpu_count()
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
processes = []
# read files list in a directory, get only the first one fort testing purpose
for element in os.listdir(path)[:1]:
tasks_to_accomplish.put(element)
# end of process signal
for element in os.listdir(path)[:number_of_processes]:
tasks_to_accomplish.put(None)
# creating processes
for w in range(1):
p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
processes.append(p)
p.start()
# completing process
for p in processes:
p.join()
# concat the outputs
print('Concaténation')
while not tasks_that_are_done.empty():
comptage_jrdemiheureGT_tot = pd.concat([comptage_jrdemiheureGT_tot,tasks_that_are_done.get()],axis=0,sort=True)
print("terminé")