Я использую AirFlow
для планирования заданий, однако это становится медленнее, чем раньше, особенно для task_stat
метода в views.py
, у меня более 400 даг и в таблице task_instance
3 миллиона строк.Мне нужно подождать более 40 секунд, чтобы получить ответ task_stat
. Есть ли способы оптимизировать этот метод?
union_all()
из RunningTI
и LastTI
- самый медленныйи если я удаляю RunningTI
и сохраняю LastTI
только при объединении результата, я мог бы получить ответ через 5 секунд, но RunningTI
необходим для внешнего интерфейса, чтобы показать подробную информацию.
Делает этоможно оптимизировать этот запрос?База данных MySQL.
метод task_stat:
@expose('/task_stats')
@login_required
@provide_session
def task_stats(self, session=None):
TI = models.TaskInstance
DagRun = models.DagRun
Dag = models.DagModel
LastDagRun = (
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
)
RunningDagRun = (
session.query(DagRun.dag_id, DagRun.execution_date)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
.subquery('running_dag_run')
)
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
LastTI = (
session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
.join(LastDagRun, and_(
LastDagRun.c.dag_id == TI.dag_id,
LastDagRun.c.execution_date == TI.execution_date))
)
RunningTI = (
session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
.join(RunningDagRun, and_(
RunningDagRun.c.dag_id == TI.dag_id,
RunningDagRun.c.execution_date == TI.execution_date))
)
UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
# if I remove RunningTi in union_all(), and change line below to
# UnionTI = union_all(LastTI).alias('union_ti'), it could save a lot of time
qry = (
session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
.group_by(UnionTI.c.dag_id, UnionTI.c.state)
)
data = {}
for dag_id, state, count in qry:
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
session.commit()
Связанные модели:
class DagRun(Base, LoggingMixin):
"""
DagRun describes an instance of a Dag. It can be created
by the scheduler (for regular runs) or by an external trigger
"""
__tablename__ = "dag_run"
ID_PREFIX = 'scheduled__'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(UtcDateTime, default=timezone.utcnow)
start_date = Column(UtcDateTime, default=timezone.utcnow)
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=True)
conf = Column(PickleType)
dag = None
__table_args__ = (
Index('dag_id_state', dag_id, _state),
UniqueConstraint('dag_id', 'execution_date'),
UniqueConstraint('dag_id', 'run_id'),
)
class TaskInstance(Base, LoggingMixin):
__tablename__ = "task_instance"
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
duration = Column(Float)
state = Column(String(20))
_try_number = Column('try_number', Integer, default=0)
max_tries = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
job_id = Column(Integer)
pool = Column(String(50))
queue = Column(String(50))
priority_weight = Column(Integer)
operator = Column(String(1000))
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))
__table_args__ = (
Index('ti_dag_state', dag_id, state),
Index('ti_dag_date', dag_id, execution_date),
Index('ti_state', state),
Index('ti_state_lkp', dag_id, task_id, execution_date, state),
Index('ti_pool', pool, state, priority_weight),
Index('ti_job_id', job_id),
)
class DagModel(Base):
__tablename__ = "dag"
"""
These items are stored in the database for state related information
"""
dag_id = Column(String(ID_LEN), primary_key=True)
# A DAG can be paused from the UI / DB
# Set this default value of is_paused based on a configuration value!
is_paused_at_creation = configuration.conf\
.getboolean('core',
'dags_are_paused_at_creation')
is_paused = Column(Boolean, default=is_paused_at_creation)
# Whether the DAG is a subdag
is_subdag = Column(Boolean, default=False)
# Whether that DAG was seen on the last DagBag load
is_active = Column(Boolean, default=False)
# Last time the scheduler started
last_scheduler_run = Column(UtcDateTime)
# Last time this DAG was pickled
last_pickled = Column(UtcDateTime)
# Time when the DAG last received a refresh signal
# (e.g. the DAG's "refresh" button was clicked in the web UI)
last_expired = Column(UtcDateTime)
# Whether (one of) the scheduler is scheduling this DAG at the moment
scheduler_lock = Column(Boolean)
# Foreign key to the latest pickle_id
pickle_id = Column(Integer)
# The location of the file containing the DAG object
fileloc = Column(String(2000))
# String representing the owners
owners = Column(String(2000))
# Description of the dag
description = Column(Text)
# Default view of the inside the webserver
default_view = Column(String(25))
# Schedule interval
schedule_interval = Column(Interval)
Ссылка на github: https://github.com/apache/airflow/blob/master/airflow/www/views.py