Создать динамический пул в Airflow - PullRequest
0 голосов
/ 20 сентября 2018

У меня есть группа обеспечения доступности баз данных, которая создает кластер, запускает вычислительные задачи и после их завершения разрушает кластер.Я хочу ограничить параллелизм для вычислительных задач, выполняемых на этом кластере, фиксированным числом.Логично, что мне нужен пул, который является эксклюзивным для кластера, созданного задачей.Я не хочу вмешательства в другие группы обеспечения доступности баз данных или другие прогоны одного и того же уровня доступности базы данных.

Я думал, что смогу решить эту проблему, динамически создав пул из задачи после создания кластера и удалив его после выполнения вычислительных задач.закончены.Я думал, что смогу шаблонировать параметр pool задач вычислений, чтобы заставить их использовать этот динамически созданный кластер.

# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
    slots=4,
    task_id='create_pool',
    dag=self
)

# the pool parameter is templated
computation = ComputeOperator(
    task_id=compute_subtask_name,
    pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
    dag=self
)

create_pool >> computation

Но таким образом задачи вычислений никогда не будут запускаться.Таким образом, я думаю, что параметр пула сохраняется в экземпляре задачи перед тем, как быть шаблонным.Я хотел бы услышать ваши мысли о том, как добиться желаемого поведения.

Ответы [ 3 ]

0 голосов
/ 25 апреля 2019

Этот ответ, вероятно, усугубит некоторых, но, тем не менее, это один из возможных путей, поэтому его стоит документировать.Основная особенность, которая делает Airflow более мощным, чем его конкуренты, заключается в том, что все определяется с помощью кода.В конце концов, если Airflow не предоставляет нам функцию, мы всегда можем просто создать эту функцию самостоятельно, используя Python.

Вы хотите иметь возможность объединять задачи в группу DAG, но только для этого конкретного запуска DAG.Поэтому постарайтесь просто создать собственный пул для ваших задач.Вот какой-то псевдокод на макушке моей головы

List<String> tasksPoolQueue = new ArrayList<String>();

def taskOnesFunction() 

  while true:

    if tasksPoolQueue.get(0) == "taskOnesTurn":
       print("Do some work it's your turn")

       # Delete this run from the list and shift the list over to the left one index
       # So that the next value is now the first value in the list
       tasksPoolQueue.delete(0)

       return 0

    else:
      sleep(10 seconds)

def taskTwosFunction()

  while true:

    if tasksPoolQueue.get(0) == "taskTwosTurn":
       print("Do some work it's your turn")

       # Delete this run from the list and shift the list over to the left one index
       # So that the next value is now the first value in the list
       tasksPoolQueue.delete(0)

       return 0

    else:
      sleep(10 seconds)

def createLogicalOrderingOfTaskPoolQueue():

    if foobar == true:
      tasksPoolQueue[0] = "taskOnesTurn"
      tasksPoolQueue[1] = "taskTwosTurn"
    else:
      tasksPoolQueue[0] = "taskTwosTurn"
      tasksPoolQueue[1] = "taskOnesTurn"

    return 0


determine_pool_queue_ordering = PythonOperator(
    task_id='determine_pool_queue_ordering',
    retries=0,
    dag=dag,
    provide_context=True,
    python_callable=createLogicalOrderingOfTaskPoolQueue,
    op_args=[])

task1 = PythonOperator(
    task_id='task1',
    retries=0,
    dag=dag,
    provide_context=True,
    python_callable=taskOnesFunction,
    op_args=[])

task2= PythonOperator(
    task_id='task2',
    retries=0,
    dag=dag,
    provide_context=True,
    python_callable=taskTwosFunction,
    op_args=[])

determine_pool_queue_ordering.set_downstream(task1)
determine_pool_queue_ordering.set_downstream(task2)

Так что, надеюсь, каждый сможет следовать моему псевдокоду.Я не знаю, каков будет лучший способ создания пользовательского пула, в котором нет «условия гонки», поэтому идея с очередью списка была тем, что я придумал на первый взгляд.Но главное здесь заключается в том, что task1 и task2 будут работать одновременно, НО внутри их функции. Я могу сделать так, чтобы функция не делала ничего значимого, пока не пройдет оператор if, препятствующий запуску реального кода.

Первая задача будет динамически определять, какие задачи будут выполняться первыми и в каком порядке, используя список.Затем все функции, которые должны быть в этом пользовательском пуле, ссылаются на этот список.Поскольку наши операторы if равны true, когда их имя-задачи первое в списке, это означает, что одновременно может выполняться только одна задача.Первая задача в списке удалит себя из списка, как только она выполнит обработку того, что ей нужно сделать.Тогда другие задачи будут находиться в спящем режиме, пока они ожидают, что их имя будет первым в списке.

Так что просто сделайте некоторую пользовательскую логику, похожую на мою.

0 голосов
/ 17 мая 2019

Вот оператор, который создает пул, если он не существует.

from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class CreatePoolOperator(BaseOperator):
    # its pool blue, get it?
    ui_color = '#b8e9ee'

    @apply_defaults
    def __init__(
            self,
            name,
            slots,
            description='',
            *args, **kwargs):
        super(CreatePoolOperator, self).__init__(*args, **kwargs)
        self.description = description
        self.slots = slots
        self.name = name

    def execute(self, context):
        try:
            pool = get_pool(name=self.name)
            if pool:
                self.log(f'Pool exists: {pool}')
                return
        except PoolNotFound:
            # create the pool
            pool = create_pool(name=self.name, slots=self.slots, description=self.description)
            self.log(f'Created pool: {pool}')

удаление пула может быть сделано аналогичным образом.

0 голосов
/ 05 октября 2018

Вместо того, чтобы пытаться заставить работать динамический пул, посмотрите, поможет ли атрибут concurrency в airflow.models.DAG.Это ограничит количество запущенных задач внутри текущего запуска процесса.

...