Использование датчика воздушного потока для запуска DAG - PullRequest
1 голос
/ 08 марта 2019

Я пытаюсь опросить конечную точку HTML API для новых данных и выполнять DAG только при наличии новых данных.

У меня есть датчик со стандартной функцией тычка, который вернет True, если это так.

Мне интересно, можно ли избежать запланированного задания и выполнять его только тогда, когда датчик возвращает значение true? В настоящее время я запускаю DAG ежедневно и через 24 часа тайм-аут датчика (так что есть только один одновременно работающий DAG). Однако, если новые данные поступают два раза в день, им придется ждать обработки следующего прогона DAG.

Ответы [ 2 ]

0 голосов
/ 09 марта 2019

Вы можете использовать 1 DAG, выделенный для зондирования. И используйте другую метку для обработки.


Sensing DAG:

датчик продолжает тыкать -> как только poke () вернет true, используйте TriggerDagRunOperator для запуска Sensing DAG -> используйте TriggerDagRunOperator для запуска Processing DAG


Обработка DAG:

обрабатывайте что хотите

0 голосов
/ 08 марта 2019

Я делаю нечто подобное, я использую Python для контроля датчика вакуума, а также для включения или выключения различных вакуумных насосов в наборе из 6 насосов.Очевидно, что я должен контролировать датчик вакуума как можно ближе к реальному времени.

Я также не хотел загружать наш главный сервер.Итак, я подключил датчик к Raspberry Pi, и он работает Python.У него есть цикл, похожий (но немного более сложный, чем) следующим образом:

    while True:
        time.sleep(0.1)
        bar = current_vacuum()
        write_wtfii_log(bar)
        ......

В коде в write_wtfii_log (bar),

     newdata = json.dumps({'bar': bar,
                        'pump_1': pump_status[1],
                        'pump_1_tor': pump_tor[1],
                        'pump_2': pump_status[2],
                        'pump_2_tor': pump_tor[2],
                        'pump_3': pump_status[3],
                        'pump_3_tor': pump_tor[3],
                        'pump_4': pump_status[4],
                        'pump_4_tor': pump_tor[4],
                        'pump_5': pump_status[5],
                        'pump_5_tor': pump_tor[5],
                        'pump_6': pump_status[6],
                        'pump_6_tor': pump_tor[6]})
url='http://10.0.0.178/flaskr/add_vacuum'
headers = {'Content-Type': 'application/json'}
try:
   response = requests.post(url, data=newdata , headers=headers)
except:
   print('Error trying to write log entry into wtfii')
   pass
return()

Это означает, что мой главный серверсидит счастливо, слушая новые чтения, которые отправляются примерно каждую секунду.Главный сервер запускает колбу с python + sqlite3, где показания регистрируются.Вы можете закодировать это так, что программы на Python отправляют только измененные показания вместо каждого прочтения.

У raspberry pi, на котором запущен этот код Python, средняя загрузка составляет около 0,02 и в настоящее время работает в течение 242 дней с момента последней перезагрузки.

...