Когда я использую сельдерей с многократной обработкой, ожидающий поток не может получить сигнал уведомления!
, но когда я запускаю код со сценарием, он работает нормально.
Проблема вызвана плохой поддержкой многопоточности в сельдерее?
Пожалуйста, дайте мне подсказку, если вы можете решить проблему, спасибо!
# tasks.py
@shared_task(bind=True)
def long_time_def(self, *args, **kwargs):
tp = ThreadPool(5)
tp.set_tasks(pv.check_position_effective, list(args))
res = tp.final_results()
while len(res) < len(args):
print(res)
return 'finished'
# ../public/tools.py
class ThreadPool:
def __init__(self, max_thread_num=5):
self.over = False
self.results = []
self.func = None
self.args_list = None
self.task_num = 0
self.max_thread_num = max_thread_num
self.pool = ThreadPoolExecutor(max_workers=max_thread_num)
self.cond = threading.Condition()
def set_tasks(self, func, args_list):
self.task_num = len(args_list)
self.args_list = args_list
self.func = func
def get_result(self, future):
self.results.append(future.result())
if len(self.args_list):
args = self.args_list.pop()
task = self.pool.submit(self.func, *args)
task.add_done_callback(self.get_result)
else:
print("result:%s"%self.results)
while self.task_num != len(self.results):
print(self.results)
time.sleep(1)
print('\n', 'finish')
self.cond.acquire()
############ this place ############
self.cond.notify()
############ this place ############
self.cond.release()
return
def _start_tasks(self):
for i in range(self.max_thread_num):
if len(self.args_list):
args = self.args_list.pop()
task = self.pool.submit(self.func, *args)
task.add_done_callback(self.get_result)
else:
break
def final_results(self):
self._start_tasks()
if self.task_num == len(self.results):
return self.results
else:
# print("main locked")
# self.cond.acquire()
############ this place ############
print("main waiting")
self.cond.wait()
############ this place ############
# print("main released")
# self.cond.release()
print("main finished")
return self.results
output
[2020-04-27 20: 53: 13,962: INFO / MainProcess] Полученное задание: position.tasks.long_time_def [***** - *****]
[2020- 04-27 20: 53: 13,991: WARNING / MainProcess] главное ожидание
[2020-04-27 20: 53: 29,091: WARNING / MainProcess] результат : [1, 1, 1, 1, 1, 1, 1, 1, 1]
[2020-04-27 20: 53: 29,092: WARNING / MainProcess] [1, 1, 1, 1, 1, 1, 1, 1, 1]
[2020-04-27 20: 53: 30,145: WARNING / MainProcess] [1, 1, 1, 1, 1, 1, 1, 1, 1]
[2020-04 -27 20: 53: 30,155: WARNING / MainProcess] результат : [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2020-04-27 20:53 : 30,156: ПРЕДУПРЕЖДЕНИЕ / MainProcess] fini sh* 1 026 *