Как использовать Airflow для обработки новых данных? - PullRequest
1 голос
/ 19 марта 2020

мы хотим использовать Airflow для обработки пакетных новых данных. Сначала наш dag запускает команду для проверки нашей CRM-системы на наличие новых данных каждые 15 минут, а затем переносит новые данные в две другие системы, так что это выглядит так:

задача 1 (проверьте, есть ли новые данные)> задача 2 (отправить новые данные в систему1)> задача 3 (отправить новые данные в system2)

Проблема в том, что

  1. числа новых данных являются динамическими c, мы не знаем, сколько данных мы можем получить.
  2. как портировать новые данные один за другим?

1 Ответ

0 голосов
/ 07 апреля 2020

Я не уверен, с какой проблемой вы сталкиваетесь. Пожалуйста, будьте более точны c. Лучше всего создать пользовательский оператор (если его нет по умолчанию).

Task1 (извлечение новой записи данных в местоположение [Экспортировать как nd json или другие форматы])> Task2 (проверяет, если имеются какие-либо данные (если местоположение является динамическим c, передайте его через xcom))> Task3 (то же самое, что и задача 2 (местоположение может быть передано как xcom)) *

Каждый запуск, запускаемый каждые 15 минут, должен вызывать новый данные и пу sh

...