Воздушный поток: ошибка целостности MySQL дубликатов при запуске прогона DAG - PullRequest
3 голосов
/ 15 мая 2019

У меня есть две DAG Airflow - планировщик и рабочий. Планировщик запускается каждую минуту и ​​опрашивает новые задания агрегации и запускает рабочие задания. Вы можете найти код для работы планировщика ниже.

Однако из более чем 6000 заданий планировщика выполнено 30 сбоев, за исключением следующего:

[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
    context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
    cursor.execute(statement, parameters
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
    res = self._query(query
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
    db.query(q
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
    _mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"

Учитывая, что подавляющее большинство запусков планировщика в порядке, я наполовину склонен сказать, что это какое-то состояние гонки в Airflow при использовании ручного / внешнего триггера.

Кто-нибудь сталкивался с подобной проблемой?

class SchedulerOperator(BaseOperator):
    def __init__(self, **kwargs):
        super(SchedulerOperator, self).__init__(**kwargs)

    def execute(self, context):
        current_time = pytz.utc.localize(datetime.utcnow())
        execution_time = current_time.replace(microsecond=0)

        meta_service = MetaServiceIntegration()
        jobs = meta_service.poll_for_jobs()

        for job in jobs:
            run_id = "{0}-{1}-{2}_{3}".format(job["cdn"], job["dist_id"], job["iso_date"], execution_time.strftime(
                '%Y-%m-%dT%H:%M:%SZ'))
            self.log.info("Scheduling DAG {0}".format(run_id))
            conf = json.dumps({
                'job_id': job["job_id"],
                "src_version": job['src_version'],
                'cdn': job["cdn"],
                'dist_id': job["dist_id"],
                'iso_date': job["iso_date"]})
            self.log.info("DAG config {0}".format(conf))

            trigger_dag(
                dag_id='worker',
                run_id=run_id,
                conf=conf,
                execution_date=execution_time
            )

            # increment by 1 sec to guarantee unique execution times for the consecutive jobs
            execution_time = execution_time + timedelta(seconds=1)

with DAG(
        dag_id="scheduler",
        start_date=datetime(2019, 1, 1, 0, 0, 0, 0),
        schedule_interval="* * * * *",  # runs every minute
        default_view="graph",
        orientation="LR",
        concurrency=5,
        max_active_runs=1,
        catchup=False
) as dag:
    node = SchedulerOperator(
        task_id="schedule",
        dag=dag
    )

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...