Python: многопоточное копирование - потоки еще живы - PullRequest
0 голосов
/ 20 ноября 2018

Здравствуйте, моя проблема в том, что у меня есть класс многопоточного копирования.И копирование работает хорошо, но программа не завершается, потому что потоки все еще живы после копирования.Я пытался встроить событие потока, но это не имеет никакого эффекта.T.join () никогда не заканчивается, потому что потоки живы.Я также сделал их демоническими, но это нежелательно, потому что программа заканчивается, но потоки все еще живы, когда программа останавливается.Кто-нибудь знает, что здесь не так?Входные данные класса - это фрейм данных с источником файла в первом столбце и местом назначения файла в другом столбце

import shutil as sh
from multiprocessing import Queue, Process, Value, Lock, cpu_count 
import threading, os, time,queue


class ThreadedCopy():


totalFiles = 0
copyCount = 0
lock = threading.Lock()

def __init__(self,srcDst):        
    #fileList = srcDst['srcCol']
    self.fileQueue = queue.Queue()
    self.totalFiles = srcDst.shape[0]

    print(str(self.totalFiles) + " files to copy.")
    self.threadWorkerCopy(srcDst)


def CopyWorker(self):
    while True:
    #while True:
        fileRow = self.fileQueue.get()
        sh.copyfile(fileRow[1], fileRow[2])

        self.fileQueue.task_done()
        with self.lock:
            self.copyCount += 1
            percent = (self.copyCount * 100) / self.totalFiles
            if (percent%10==0):
                print(str(percent) + " percent copied.")

def threadWorkerCopy(self, srcDst):
    threads=[]
    for fileRow in srcDst.itertuples():
        self.fileQueue.put(fileRow)
    for i in range(cpu_count()):
        t = threading.Thread(target=self.CopyWorker,name='CopyThread')            
        t.daemon = True
        t.start()
        #threads.append(t)

    self.fileQueue.join()

ThreadedCopy(scrDstDf)

РЕДАКТИРОВАТЬ

Если я нажимаю клавишу для прерывания программыЗдесь висит:

<ipython-input-14-8d9a9b84e73f> in threadWorkerCopy(self, srcDst)
    380         self.stop_event.set()
    381         for thread in threads:
--> 382             thread.join()
    383 
    384 #ThreadedCopy(scrDstDf)

/usr/lib/python3.5/threading.py in join(self, timeout)
   1052 
   1053         if timeout is None:
-> 1054             self._wait_for_tstate_lock()
   1055         else:
   1056             # the behavior of a negative timeout isn't documented, but

/usr/lib/python3.5/threading.py in _wait_for_tstate_lock(self, block, timeout)
   1068         if lock is None:  # already determined that the C code is done
   1069             assert self._is_stopped
-> 1070         elif lock.acquire(block, timeout):
   1071             lock.release()
   1072             self._stop()

KeyboardInterrupt: 

1 Ответ

0 голосов
/ 20 ноября 2018

Ваш рабочий поток заблокирован на self.fileQueue.get(), поэтому он не проверяет событие остановки.

Самый простой способ решить эту проблему - сделать поток потоком демона.Таким образом, они будут автоматически завершены, когда завершится основной поток.

Если по какой-то причине вы не хотите / не можете этого сделать, вам нужно будет разбудить рабочий поток, вставивспециальное значение маркера для очереди, которое проверяет ваш работник; если работник видит это значение в очереди, оно должно завершиться само собой.

...