Composer (Airflow) Конфликт DAG RunID в GCP - PullRequest
0 голосов
/ 01 августа 2020

У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.

Проблема в том, что когда несколько файлов помещаются в одно и то же время за секунду, при вызове функции возникает ошибка, приведенная ниже:

b '{ "error": "Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag"} \ n '

Чтобы решить эту проблему, мы передаем run_id как' run_id ': 'IMAOC_31072020201842766625', дата в миллисекундах.

Код:

dag_name = environ_vars['imaoc_meta_dag']
    webserver_url = (
        webserver_id
        + '/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )

    print('webserver_url: {}'.format(webserver_url))
    data['run_id'] = _datetime.datetime.now().strftime(**"IMAOC_%d%m%Y%H%M%S%f"**)
    resp = map_iap_request(webserver_url,client_id,method = 'POST',json = data)
    print('response text:{}'.format(resp))

Но все еще не решен, и AIRFLOW_CTX_DAG_RUN_ID идет как «manual__2020-07-31T20: 18: 43+ 00:00 "формат ....

Не знаю, что делать, чтобы убрать этот конфликт и запустить DAG, если файл приходит в ту же секунду.

...