Запуск задания ETL Airflow путем извлечения дополнительных данных из базы данных. - PullRequest
0 голосов
/ 04 ноября 2019

Я хочу выполнить некоторые преобразования данных, хранящихся в MongoDB, и вставить преобразованные данные в ElasticSearch с помощью Airflow DAG. Хотя я могу написать операторы для извлечения данных и выполнения преобразований, проблема в том, что я хочу запускать группу обеспечения доступности баз данных только тогда, когда новые данные вставлены в коллекцию, и извлекать эти новые данные для преобразования. Я читал о датчиках, но не смог найти конкретного решения. Кто-нибудь может посоветовать, пожалуйста?

РЕДАКТИРОВАТЬ: Я прошу прощения за то, что не объяснил проблему должным образом.

Ниже приведены задачи, которые я должен выполнить: 1. Каждый раз, когда новые документы вставляются в MongoDB, извлекайте документы и выполняйте некоторые преобразования JSON. 2. Передайте преобразованные данные JSON в следующую задачу, в которой выполняется дальнейшая обработка, и вставьте данные в индекс Elasticsearch.

Моя идея получить только новые данные состоит в следующем: 1. Вставить уникальный инкрементный batch_id вместе сданные в MongoDB. 2. Получить максимальный batch_id для предыдущего задания и ткнуть MongoDB для увеличенного batch_id. 3. Если присутствует новый увеличенный batch_id, извлеките данные.

Вышеприведенная идея работает, но я не уверен, что это правильный подход.

Текущий конвейер выглядит следующим образом:

The 1st task is a sensor

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...