У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.
Проблема в том, что когда несколько файлов помещаются в одно и то же время за секунду, при вызове функции возникает ошибка, приведенная ниже:
b '{ "error": "Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag"} \ n '
Чтобы решить эту проблему, мы передаем run_id как' run_id ': 'IMAOC_31072020201842766625', дата в миллисекундах.
Код:
dag_name = environ_vars['imaoc_meta_dag']
webserver_url = (
webserver_id
+ '/api/experimental/dags/'
+ dag_name
+ '/dag_runs'
)
print('webserver_url: {}'.format(webserver_url))
data['run_id'] = _datetime.datetime.now().strftime(**"IMAOC_%d%m%Y%H%M%S%f"**)
resp = map_iap_request(webserver_url,client_id,method = 'POST',json = data)
print('response text:{}'.format(resp))
Но все еще не решен, и AIRFLOW_CTX_DAG_RUN_ID идет как «manual__2020-07-31T20: 18: 43+ 00:00 "формат ....
Не знаю, что делать, чтобы убрать этот конфликт и запустить DAG, если файл приходит в ту же секунду.