Я использую django-channel 2 для отслеживания хода выполнения нескольких долгоживущих задач - парсинга M файлов.Я посылаю сообщение веб-сокета из внешнего интерфейса и отправляю имена файлов N работникам.M> N (пусть M = 4, N = 3).Во всех экспериментах последняя (4-я) задача выполнялась после всех остальных задач.
Как я могу настроить django-каналы для запуска других задач после выполнения первой задачи?
Вот частькод потребителя django-channel:
class WSConsumer(AsyncWebsocketConsumer):
###skipped connect and disconnect methods
async def receive(self, text_data=None, bytes_data=None):
print('try receive:')
print(text_data)
data=json.loads(text_data)
if (data['type']=='collect'):
await self.handle_collect_message(data['id'], data['filename'])
elif (data['type']=='upload'):
await self.handle_upload_message()
async def handle_collect_message(self, id, filename):
print('collect:',id, filename)
self.tasks.append({
'type':'upload_file',
'reply-channel':self.channel_name,
'id':id,
'filename': filename,
})
async def handle_upload_message(self):
print('start upload:')
for future in asyncio.as_completed(map(lambda value:self.send_task_to_worker(
{
'type':'prepare_file',
'reply-channel':value['reply-channel'],
'id':value['id'],
'filename':value['filename']
}
),self.tasks)):
await future
async def send_task_to_worker(self,task):
print('task: ',task['type'],' id=',task['id'])
await self.channel_layer.send('progress-worker',task)
async def worker_progress(self, message):
if (message['state']=='complete'):
print('task #',message['id'],' complete')
await self.send(text_data=json.dumps(message))
async def worker_prepared(self, message):
id=message['id']
lines=message['lines']
task=list(filter(lambda x : x['id']==id,self.tasks))[0]
task['lines']=lines
self.prepared_tasks.append(task)
if (len(self.prepared_tasks)==len(self.tasks)):
sorted_tasks=sorted(self.prepared_tasks, key=lambda k: k['lines'],reverse=True)
for future in asyncio.as_completed(map(lambda value:self.send_task_to_worker(value),sorted_tasks)):
await future