Итак, у меня есть этот довольно хороший DAG в потоке воздуха, который в основном выполняет несколько этапов анализа (реализованных как плагины воздушного потока) для двоичных файлов.DAG запускается датчиком ftp, который просто проверяет наличие нового файла на сервере ftp, а затем запускает весь рабочий процесс.
Таким образом, в настоящее время рабочий процесс выглядит следующим образом: DAG запускается в соответствии с определением -> датчик ожидает новый файл на ftp -> выполняются шаги анализа -> конец рабочего процесса.
То, что я хотел бы иметь, выглядит примерно так: DAG - это триггеры -> датчик ждет нового файла на ftp -> для каждого файла на ftp этапы анализа выполняются индивидуально -> каждый рабочий процесс заканчивается индивидуально,
Как заставить рабочий процесс анализа выполняться для каждого файла на ftp-сервере, и если на сервере нет файла, только один датчик должен ожидать новый файл?Я не хочу, например, запускать группу DAG каждую секунду или около того, потому что тогда у меня есть много датчиков, просто ожидающих новый файл.