Мы рассматриваем возможность использования воздушного потока для замены нашего в настоящее время настраиваемого рабочего процесса на основе RQ, но я не уверен в том, как лучше его спроектировать.Или, если даже имеет смысл использовать воздушный поток.Вариант использования:
- Мы получаем загрузку данных от пользователя.
- Учитывая полученные типы данных, мы по выбору запускаем ноль или более заданий
- Каждое задание выполняетсяесли была получена определенная комбинация типов данных.Он выполняется для этого пользователя в течение периода времени, определенного по полученным данным
- Задание считывает данные из базы данных и записывает результаты в базу данных.
- В результате этих заданий потенциально могут запускаться другие задания.
например,
После загрузки данных мы помещаем элемент в очередь:
загрузка:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
И это сработало бы:
- job1, пользователь 'a', начало: 1, конец: 3
- job2, пользователь'a', начало: 2, конец: 3
, а затем, возможно, в job1 будет выполнено некоторое задание по очистке, которое будет выполняться после него.(Также было бы хорошо, если бы можно было ограничить выполнение заданий только при отсутствии других заданий для того же пользователя.)
Подходы, которые я рассмотрел:
1.
Запуск группы обеспечения доступности баз данных, когда загрузка данных поступает в очередь сообщений.
Затем эта группа обеспечения доступности баз данных определяет, какие зависимые задания выполнять, и передает в качестве аргументов (или xcom) пользователя и диапазон времени.
2.
Запуск группы обеспечения доступности баз данных, когда выгрузка данных поступает в очередь сообщений.
Затем эта группа обеспечения доступности баз данных динамически создает группы обеспечения доступности баз данных для заданий на основе типов данных и шаблонов пользователя и таймфрейма.,
Таким образом, вы получаете динамические группы доступности базы данных для каждого пользователя, задания, комбинированного списка времени.
Я даже не уверен, как вызывать группы доступности базы данных из очереди сообщений ... И это сложночтобы найти примеры, похожие на этот вариант использования.Может быть, это потому, что Airflow не подходит?
Любая помощь / мысли / советы будет принята с благодарностью.
Спасибо.