Воздушный поток: создайте оператора, который возвращает либо датчик, либо фиктивный оператор на основе результата перехвата. - PullRequest
0 голосов
/ 17 июня 2020

Я хотел бы знать, есть ли способ создать оператор, который выполняет ловушку pub / sub (или другую ловушку), которая не сработает, если объект уже существует. Если этот хук возвращает Exception, то мы активируем Sensor или продолжаем DAG, если нет.

Я пытался реализовать это, имея в виду следующий метакод, но пока не смог.

class CheckIfExistOperator(BaseOperator):
    def execute(self, context):
        try:
            PubSubHook(
                ...
            ).create_subscription(
                ...
                fail_if_exists=True
            )
            return DummyOperator(
                task_id='subscriber_already_exists',
                ...
                )
        except PubSubException as e:
            return PubSubPullSensor(
               ...
            )

Какие-либо предложения? Спасибо :)

Ответы [ 2 ]

2 голосов
/ 18 июня 2020

Вроде нужно какое-то ветвление. В приведенном ниже примере я использовал BranchPythonOperator для выполнения функции, которая пытается создать новую подписку и вернуть строку, информирующую, была ли задача выполнена успешно или неудачно. После этого я использовал два PythonOperators с task_id, соответствующими строкам, возвращаемым BranchPythonOperator, и определил зависимость между start_op и success и fail tasks

from airflow.contrib.hooks import gcp_pubsub_hook
from airflow.operators import python_operator
from airflow import models

def print_context1(ds, **kwargs):
    return 'THE TASK SUCCEDED'

def print_context2(ds, **kwargs):
    return 'THE TASK FAILED'

def start_function(ds, **kwargs):
    try:        
        response = gcp_pubsub_hook.PubSubHook().create_subscription(
        topic_project="my-project",
        topic="beam",
        subscription="beam_sub2",
        subscription_project=None, 
        ack_deadline_secs=10,
        fail_if_exists=True)
        return "succeeded"
    except gcp_pubsub_hook.PubSubException:
        return "failed"


default_dag_args = {
...
}

with models.DAG(
        'pubsub_airflow',
        default_args=default_dag_args) as dag:

    start_op = python_operator.BranchPythonOperator(
        task_id='start',
        provide_context=True,
        python_callable=start_function
    )

    success = python_operator.PythonOperator(
        task_id='succeeded',
        provide_context=True,
        python_callable=print_context1
    )

    fail = python_operator.PythonOperator(
        task_id='failed',
        provide_context=True,
        python_callable=print_context2
    )

    start_op >> [success,fail]

В вашем случае вы можете использовать этот код в качестве основы и заменить два PythonOperators на ваш DummyOperator и ваш датчик.

1 голос
/ 18 июня 2020

Если честно, я бы прочитал это как не-Pythoni c, а не то, что ожидает Airflow. Я был бы очень удивлен, если бы вы смогли заставить этот шаблон работать. Я просто хочу прояснить, что рекомендую переосмыслить проблему, имея в виду больше Python и Airflow.

Airflow изначально имеет шаблон ветвления и рекомендует использовать его как часть своих ключевых концепций ... вот документы, с которых я бы порекомендовал вам начать . Он предоставит вам все необходимые конечные функции.

...