Оптимизация воздушных запросов ORM - PullRequest
0 голосов
/ 22 мая 2019

Я использую AirFlow для планирования заданий, однако это становится медленнее, чем раньше, особенно для task_stat метода в views.py, у меня более 400 даг и в таблице task_instance 3 миллиона строк.Мне нужно подождать более 40 секунд, чтобы получить ответ task_stat. Есть ли способы оптимизировать этот метод?

union_all() из RunningTI и LastTI - самый медленныйи если я удаляю RunningTI и сохраняю LastTI только при объединении результата, я мог бы получить ответ через 5 секунд, но RunningTI необходим для внешнего интерфейса, чтобы показать подробную информацию.

Делает этоможно оптимизировать этот запрос?База данных MySQL.

метод task_stat:

@expose('/task_stats')
@login_required
@provide_session
def task_stats(self, session=None):
    TI = models.TaskInstance
    DagRun = models.DagRun
    Dag = models.DagModel

    LastDagRun = (
        session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
            .join(Dag, Dag.dag_id == DagRun.dag_id)
            .filter(DagRun.state != State.RUNNING)
            .filter(Dag.is_active == True)  # noqa: E712
            .filter(Dag.is_subdag == False)  # noqa: E712
            .group_by(DagRun.dag_id)
            .subquery('last_dag_run')
    )
    RunningDagRun = (
        session.query(DagRun.dag_id, DagRun.execution_date)
            .join(Dag, Dag.dag_id == DagRun.dag_id)
            .filter(DagRun.state == State.RUNNING)
            .filter(Dag.is_active == True)  # noqa: E712
            .filter(Dag.is_subdag == False)  # noqa: E712
            .subquery('running_dag_run')
    )

    # Select all task_instances from active dag_runs.
    # If no dag_run is active, return task instances from most recent dag_run.
    LastTI = (
        session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
        .join(LastDagRun, and_(
            LastDagRun.c.dag_id == TI.dag_id,
            LastDagRun.c.execution_date == TI.execution_date))
    )
    RunningTI = (
        session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
        .join(RunningDagRun, and_(
            RunningDagRun.c.dag_id == TI.dag_id,
            RunningDagRun.c.execution_date == TI.execution_date))
    )

    UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
    # if I remove RunningTi in union_all(), and change line below to
    # UnionTI = union_all(LastTI).alias('union_ti'), it could save a lot of time

    qry = (
        session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
        .group_by(UnionTI.c.dag_id, UnionTI.c.state)
    )

    data = {}
    for dag_id, state, count in qry:
        if dag_id not in data:
            data[dag_id] = {}
        data[dag_id][state] = count
    session.commit()

Связанные модели:

class DagRun(Base, LoggingMixin):
    """
    DagRun describes an instance of a Dag. It can be created
    by the scheduler (for regular runs) or by an external trigger
    """
    __tablename__ = "dag_run"

    ID_PREFIX = 'scheduled__'
    ID_FORMAT_PREFIX = ID_PREFIX + '{0}'

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(ID_LEN))
    execution_date = Column(UtcDateTime, default=timezone.utcnow)
    start_date = Column(UtcDateTime, default=timezone.utcnow)
    end_date = Column(UtcDateTime)
    _state = Column('state', String(50), default=State.RUNNING)
    run_id = Column(String(ID_LEN))
    external_trigger = Column(Boolean, default=True)
    conf = Column(PickleType)

    dag = None

    __table_args__ = (
        Index('dag_id_state', dag_id, _state),
        UniqueConstraint('dag_id', 'execution_date'),
        UniqueConstraint('dag_id', 'run_id'),
    )


class TaskInstance(Base, LoggingMixin):
    __tablename__ = "task_instance"

    task_id = Column(String(ID_LEN), primary_key=True)
    dag_id = Column(String(ID_LEN), primary_key=True)
    execution_date = Column(UtcDateTime, primary_key=True)
    start_date = Column(UtcDateTime)
    end_date = Column(UtcDateTime)
    duration = Column(Float)
    state = Column(String(20))
    _try_number = Column('try_number', Integer, default=0)
    max_tries = Column(Integer)
    hostname = Column(String(1000))
    unixname = Column(String(1000))
    job_id = Column(Integer)
    pool = Column(String(50))
    queue = Column(String(50))
    priority_weight = Column(Integer)
    operator = Column(String(1000))
    queued_dttm = Column(UtcDateTime)
    pid = Column(Integer)
    executor_config = Column(PickleType(pickler=dill))

    __table_args__ = (
        Index('ti_dag_state', dag_id, state),
        Index('ti_dag_date', dag_id, execution_date),
        Index('ti_state', state),
        Index('ti_state_lkp', dag_id, task_id, execution_date, state),
        Index('ti_pool', pool, state, priority_weight),
        Index('ti_job_id', job_id),
    )

class DagModel(Base):

    __tablename__ = "dag"
    """
    These items are stored in the database for state related information
    """
    dag_id = Column(String(ID_LEN), primary_key=True)
    # A DAG can be paused from the UI / DB
    # Set this default value of is_paused based on a configuration value!
    is_paused_at_creation = configuration.conf\
        .getboolean('core',
                    'dags_are_paused_at_creation')
    is_paused = Column(Boolean, default=is_paused_at_creation)
    # Whether the DAG is a subdag
    is_subdag = Column(Boolean, default=False)
    # Whether that DAG was seen on the last DagBag load
    is_active = Column(Boolean, default=False)
    # Last time the scheduler started
    last_scheduler_run = Column(UtcDateTime)
    # Last time this DAG was pickled
    last_pickled = Column(UtcDateTime)
    # Time when the DAG last received a refresh signal
    # (e.g. the DAG's "refresh" button was clicked in the web UI)
    last_expired = Column(UtcDateTime)
    # Whether (one  of) the scheduler is scheduling this DAG at the moment
    scheduler_lock = Column(Boolean)
    # Foreign key to the latest pickle_id
    pickle_id = Column(Integer)
    # The location of the file containing the DAG object
    fileloc = Column(String(2000))
    # String representing the owners
    owners = Column(String(2000))
    # Description of the dag
    description = Column(Text)
    # Default view of the inside the webserver
    default_view = Column(String(25))
    # Schedule interval
    schedule_interval = Column(Interval)

Ссылка на github: https://github.com/apache/airflow/blob/master/airflow/www/views.py

...