Как работать с django-channel 2, если количество долгоживущих заданий больше числа работников? - PullRequest
1 голос
/ 23 марта 2019

Я использую django-channel 2 для отслеживания хода выполнения нескольких долгоживущих задач - парсинга M файлов.Я посылаю сообщение веб-сокета из внешнего интерфейса и отправляю имена файлов N работникам.M> N (пусть M = 4, N = 3).Во всех экспериментах последняя (4-я) задача выполнялась после всех остальных задач.

Как я могу настроить django-каналы для запуска других задач после выполнения первой задачи?

Вот частькод потребителя django-channel:

class WSConsumer(AsyncWebsocketConsumer):
###skipped connect and disconnect methods

    async def receive(self, text_data=None, bytes_data=None):
        print('try receive:')
        print(text_data)
        data=json.loads(text_data)
        if (data['type']=='collect'):
            await self.handle_collect_message(data['id'], data['filename'])
        elif (data['type']=='upload'):
            await self.handle_upload_message()

    async def handle_collect_message(self, id, filename):
        print('collect:',id, filename)
        self.tasks.append({
                'type':'upload_file', 
                'reply-channel':self.channel_name, 
                'id':id,
                'filename': filename,
            })

    async def handle_upload_message(self):
        print('start upload:')
        for future in asyncio.as_completed(map(lambda value:self.send_task_to_worker(
                {
                    'type':'prepare_file',
                    'reply-channel':value['reply-channel'],
                    'id':value['id'],
                    'filename':value['filename']
                }
            ),self.tasks)):
            await future


    async def send_task_to_worker(self,task):
        print('task: ',task['type'],' id=',task['id'])
        await self.channel_layer.send('progress-worker',task)   

    async def worker_progress(self, message):
        if (message['state']=='complete'):
            print('task #',message['id'],' complete')
        await self.send(text_data=json.dumps(message))

    async def worker_prepared(self, message):
        id=message['id']
        lines=message['lines']
        task=list(filter(lambda x : x['id']==id,self.tasks))[0]
        task['lines']=lines
        self.prepared_tasks.append(task)
        if (len(self.prepared_tasks)==len(self.tasks)):
            sorted_tasks=sorted(self.prepared_tasks, key=lambda k: k['lines'],reverse=True)

            for future in asyncio.as_completed(map(lambda value:self.send_task_to_worker(value),sorted_tasks)):
                await future

...