Я хочу выполнить некоторые преобразования данных, хранящихся в MongoDB, и вставить преобразованные данные в ElasticSearch с помощью Airflow DAG. Хотя я могу написать операторы для извлечения данных и выполнения преобразований, проблема в том, что я хочу запускать группу обеспечения доступности баз данных только тогда, когда новые данные вставлены в коллекцию, и извлекать эти новые данные для преобразования. Я читал о датчиках, но не смог найти конкретного решения. Кто-нибудь может посоветовать, пожалуйста?
РЕДАКТИРОВАТЬ: Я прошу прощения за то, что не объяснил проблему должным образом.
Ниже приведены задачи, которые я должен выполнить: 1. Каждый раз, когда новые документы вставляются в MongoDB, извлекайте документы и выполняйте некоторые преобразования JSON. 2. Передайте преобразованные данные JSON в следующую задачу, в которой выполняется дальнейшая обработка, и вставьте данные в индекс Elasticsearch.
Моя идея получить только новые данные состоит в следующем: 1. Вставить уникальный инкрементный batch_id вместе сданные в MongoDB. 2. Получить максимальный batch_id для предыдущего задания и ткнуть MongoDB для увеличенного batch_id. 3. Если присутствует новый увеличенный batch_id, извлеките данные.
Вышеприведенная идея работает, но я не уверен, что это правильный подход.
Текущий конвейер выглядит следующим образом: