При использовании сельдерея с многократной обработкой сигнал уведомления не может быть получен ожидающим потоком - PullRequest
0 голосов
/ 27 апреля 2020

Когда я использую сельдерей с многократной обработкой, ожидающий поток не может получить сигнал уведомления!

, но когда я запускаю код со сценарием, он работает нормально.

Проблема вызвана плохой поддержкой многопоточности в сельдерее?

Пожалуйста, дайте мне подсказку, если вы можете решить проблему, спасибо!

# tasks.py

@shared_task(bind=True)
def long_time_def(self, *args, **kwargs):
    tp = ThreadPool(5)
    tp.set_tasks(pv.check_position_effective, list(args))
    res = tp.final_results()
    while len(res) < len(args):
        print(res)
    return 'finished'
# ../public/tools.py

class ThreadPool:
    def __init__(self, max_thread_num=5):
        self.over = False
        self.results = []

        self.func = None
        self.args_list = None
        self.task_num = 0
        self.max_thread_num = max_thread_num

        self.pool = ThreadPoolExecutor(max_workers=max_thread_num)
        self.cond = threading.Condition()

    def set_tasks(self, func, args_list):
        self.task_num = len(args_list)
        self.args_list = args_list
        self.func = func


    def get_result(self, future):
        self.results.append(future.result())
        if len(self.args_list):
            args = self.args_list.pop()
            task = self.pool.submit(self.func, *args)
            task.add_done_callback(self.get_result)
        else:
            print("result:%s"%self.results)
            while self.task_num != len(self.results):
                print(self.results)
                time.sleep(1)
            print('\n', 'finish')
            self.cond.acquire()

            ############ this place ############
            self.cond.notify()
            ############ this place ############

            self.cond.release()
            return

    def _start_tasks(self):
        for i in range(self.max_thread_num):

            if len(self.args_list):
                args = self.args_list.pop()
                task = self.pool.submit(self.func, *args)
                task.add_done_callback(self.get_result)
            else:
                break

    def final_results(self):
        self._start_tasks()
        if self.task_num == len(self.results):
            return self.results
        else:
            # print("main locked")
            # self.cond.acquire()

            ############ this place ############
            print("main waiting")
            self.cond.wait()
            ############ this place ############

            # print("main released")
            # self.cond.release()
            print("main finished")
            return self.results

output

[2020-04-27 20: 53: 13,962: INFO / MainProcess] Полученное задание: position.tasks.long_time_def [***** - *****]

[2020- 04-27 20: 53: 13,991: WARNING / MainProcess] главное ожидание

[2020-04-27 20: 53: 29,091: WARNING / MainProcess] результат : [1, 1, 1, 1, 1, 1, 1, 1, 1]

[2020-04-27 20: 53: 29,092: WARNING / MainProcess] [1, 1, 1, 1, 1, 1, 1, 1, 1]

[2020-04-27 20: 53: 30,145: WARNING / MainProcess] [1, 1, 1, 1, 1, 1, 1, 1, 1]

[2020-04 -27 20: 53: 30,155: WARNING / MainProcess] результат : [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

[2020-04-27 20:53 : 30,156: ПРЕДУПРЕЖДЕНИЕ / MainProcess] fini sh* 1 026 *

1 Ответ

0 голосов
/ 27 апреля 2020

Celery использует Billiard для управления процессами, поэтому на данный момент вы должны использовать Billiard вместо многопроцессорных.

...