Я использую скрипт Python 2.7, который обрабатывает многие тысячи файлов и документов, используя 16 процессоров для обработки заданий, поставленных в очередь в JoinableQueue.Мы столкнулись с проблемой, из-за которой некоторые обрабатываемые данные файла / папки повреждены.Кажется, что все работы в конечном итоге завершены, но работа с поврежденными данными занимает очень много времени.Длительная обработка на самом деле происходит во внешней библиотеке, поэтому, как только она начнется, процесс должен дождаться завершения метода библиотеки.
Я не хочу убивать долго выполняющиеся процессы, но когда работа занимает больше 30 секунд или одну минуту, я бы хотел записать сообщение о том, какая работа работает дольше, чем ожидалось.
Основной код заблокирован с помощью queue.join ().Как я могу контролировать состояние обработки?Это лучший подход для запуска асинхронного фонового процесса таймера при каждой обработке задания документа, или есть лучший способ?
Я удалил большую часть кода, но скелет того, что мыделает следующее:
queue = mp.JoinableQueue()
for doc in doclist:
queue.put(doc)
processes = [mp.Process(target=doprocessing, args=[queue]) for i in range(nb_workers)]
for p in processes:
p.start()
queue.join()
for p in processes:
p.terminate()
def doprocessing(queue):
while True:
item = queue.get()
try:
processDoc(item["id"])
except:
pass
queue.task_done()
def processDoc(id):
# Do processing
pass