Python процесс никогда не возвращается, нужно прервать и, наконец, получить сообщение in _wait_for_tstate_lock - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь помочь коллеге разогнать ноутбук jupiter с некоторой многопроцессорностью. У меня это плохо получается: - /

Что-то не так, программа не возвращается, и я не могу понять почему. Я должен остановить выполнение, а затем я вижу в стеке, что происходит ошибка в _wait_for_tstate_lock.

Согласно выводам моей программы, кажется, что процесс завершен ... Элемент «Нет» обнаружен в queue и while l oop должен закончиться.

Для информации, задание заключается в импорте файлов xlsx, их очистке с помощью panda и объединении их в единый фрейм данных. (1 файл для тестирования, около 80 МБ на диске)

Работал как шарм, когда не использовалась многопроцессорная обработка.

Выходы

Journal_Appels_2019_01.xlsx démarré par Process-86
Journal_Appels_2019_01.xlsx importé en 113.18582201004028 secondes
Fin du processus Process-86

Прерывание стека (клавиатура прерывание)

File "/anaconda/envs/azureml_py36/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join
    thread.join()
  File "/anaconda/envs/azureml_py36/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/anaconda/envs/azureml_py36/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Код

from multiprocessing import Lock, Process, Queue, current_process
import time


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
            task = tasks_to_accomplish.get_nowait()
            if (task is None):
                print('Fin du processus ' + current_process().name)
                break

            print(task + ' démarré par ' + current_process().name)
            starttime = time.time()
            tasks_that_are_done.put(import_journal(task))
            print(task + ' importé en {} secondes'.format(time.time() - starttime))


# setting up
number_of_processes = multiprocessing.cpu_count()
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
processes = []

# read files list in a directory, get only the first one fort testing purpose
for element in os.listdir(path)[:1]:
    tasks_to_accomplish.put(element)

# end of process signal
for element in os.listdir(path)[:number_of_processes]:
    tasks_to_accomplish.put(None)

# creating processes
for w in range(1):
    p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
    processes.append(p)
    p.start()

# completing process
for p in processes:
    p.join()

# concat the outputs
print('Concaténation')
while not tasks_that_are_done.empty():
    comptage_jrdemiheureGT_tot = pd.concat([comptage_jrdemiheureGT_tot,tasks_that_are_done.get()],axis=0,sort=True)
print("terminé")
...