Воздушный поток - Access Xcom в BranchPythonOperator - PullRequest
1 голос
/ 10 апреля 2019

Я интенсивно искал блоги и документацию по воздушным потокам для отладки возникшей проблемы.

Что я пытаюсь решить

  1. Проверкаесли определенный файл существует на ftp-сервере

  2. , если он существует, загрузить его в облако

  3. Если его не существует, отправьте электронное письмоклиенту, сообщающему, что файл не найден

Что у меня есть

  1. Пользовательский оператор, расширяющий BaseOperator, который используетSSH Hook и выдвигает значение (true или false).

  2. Задача, которая использует BranchPythonOperator, чтобы извлечь значение из xcom и проверить, вернула ли предыдущая задача значение true или false, и принять решение оследующее задание.

Пожалуйста, посмотрите код ниже.Этот код является упрощенной версией того, что я пытаюсь сделать.

Если кто-то заинтересовался моим исходным кодом, прокрутите вниз до конца вопроса.

Здесь пользовательский оператор просто возвращает String Even или Odd, основываясь на четной или нечетной минуте.

import logging

from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from datetime import datetime

log = logging.getLogger(__name__)

class MediumTestOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                 do_xcom_push=True,
                 *args,
                 **kwargs):
        super(MediumTestOperator, self).__init__(*args, **kwargs)
        self.do_xcom_push = do_xcom_push
        self.args = args
        self.kwargs = kwargs

    def execute(self, context):
        # from IPython import embed; embed()
        current_minute = datetime.now().minute

        context['ti'].xcom_push(key="Airflow", value="Apache Incubating")

        if current_minute %2 == 0:
            context['ti'].xcom_push(key="minute", value="Even")
        else:
            context['ti'].xcom_push(key="minute", value="Odd")
        # from IPython import embed; embed()


class MediumTestOperatorPlugin(AirflowPlugin):
    name = "medium_test"
    operators = [MediumTestOperator]

Файл: caller.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from medium_payen_op import MediumTestOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'guillaume',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 18),
    'email': ['hello@moonshots.ai'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}
dag = DAG(
    'Weekday',
    default_args=default_args,
    schedule_interval="@once")


sample_task = MediumTestOperator(
    task_id='task_1',
    provide_context=True,
    dag=dag
)


def get_branch_follow(**kwargs):
    x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
    print("From Kwargs: ", x)
    if x == 'Even':
        return 'task_3'
    else:
        return 'task_4'


task_2 = BranchPythonOperator(
    task_id='task_2_branch',
    python_callable=get_branch_follow,
    provide_context=True,
    dag=dag
)


def get_dample(**kwargs):
    x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
    y = kwargs['ti'].xcom_pull(task_ids='task_1', key="Airflow")
    print("Minute is:", x, " Airflow is from: ", y)
    print("Task 3 Running")


task_3 = PythonOperator(
    python_callable=get_dample,
    provide_context=True,
    dag=dag,
    task_id='task_3'
)


def get_dample(**kwargs):
    x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
    y = kwargs['ti'].xcom_pull(task_ids='task_1', key="Airflow")
    print("Minute is:", x, " Airflow is from: ", y)
    print("Task 4 Running")


task_4 = PythonOperator(
    python_callable=get_dample,
    provide_context=True,
    dag=dag,
    task_id='task_4'
)

sample_task >> task_3

task_2 >> task_3
task_2 >> task_4

Как видно из прикрепленных изображений, push-уведомление Xcom сработало, и я могу получить значения из PythonOperator, но не из BranchPythonOperator.

Любая помощь приветствуется.

Извлечение Xcom изнутриPython Callable для BranchPythonOperator всегда возвращает «None», в результате чего блок Else выполняется всегда.PythonBranchOperator Logs - Xcom_Pull returns 'None'

Древовидное представление DAG Tree View of the DAG

Значения XCom с экрана администратора XCom Values from Admin Screen

Xcom Pull из PythonOperator возвращает правильные значения.Xcom Pull from Python Operator and it works

Xcom Pull - different value


Это оригинальный код, с которым я работаю

Пользовательский оператор выдвигает строку True или False в качестве значения Xcom, которое затем читается BranchPythonOperator.

Я хочу прочитать значение, выдвигаемое задачей, созданной с помощью указанного выше пользовательского оператора, внутри задачи BranchPythonOperator и выбрать другойпуть на основе возвращаемого значения.

Файл: check_file_exists_operator.py

import logging
from tempfile import NamedTemporaryFile

from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults

log = logging.getLogger(__name__)


class CheckFileExistsOperator(BaseOperator):
    """
    This operator checks if a given file name exists on the
    the sftp server.

    Returns true if it exists, false otherwise.

    :param sftp_path_prefix: The sftp remote path. This is the specified file path
        for downloading the file from the SFTP server.
    :type sftp_path_prefix: string
    :param file_to_be_processed: File that is to be Searched 
    :type file_to_be_processed: str
    :param sftp_conn_id: The sftp connection id. The name or identifier for
        establishing a connection to the SFTP server.
    :type sftp_conn_id: string
    :param timeout: timeout (in seconds) for executing the command.
    :type timeout: int
    :param do_xcom_push: return the stdout which also get set in xcom by
           airflow platform
    :type do_xcom_push: bool

    """

    FORWARD_SLASH_LITERAL = '/'

    template_fields = ('file_to_be_processed',)

    @apply_defaults
    def __init__(self,
                 sftp_path_prefix,
                 file_to_be_processed,
                 sftp_conn_id='ssh_default',
                 timeout=10,
                 do_xcom_push=True,
                 *args,
                 **kwargs):
        super(CheckFileExistsOperator, self).__init__(*args, **kwargs)
        self.sftp_path_prefix = sftp_path_prefix
        self.file_to_be_processed = file_to_be_processed
        self.sftp_conn_id = sftp_conn_id
        self.timeout = timeout
        self.do_xcom_push = do_xcom_push
        self.args = args
        self.kwargs = kwargs

    def execute(self, context):

        # Refer to https://docs.paramiko.org/en/2.4/api/sftp.html
        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
        sftp_client = ssh_hook.get_conn().open_sftp()

        sftp_file_absolute_path = self.sftp_path_prefix.strip() + \
                                  self.FORWARD_SLASH_LITERAL + \
                                  self.file_to_be_processed.strip()

        task_instance = context['task_instance']

        log.debug('Checking if the follwoing file exists: %s', sftp_file_absolute_path)

        try:
            with NamedTemporaryFile("w") as temp_file:
                sftp_client.get(sftp_file_absolute_path, temp_file.name)

                # Return a string equivalent of the boolean.
                # Returning a boolean will make the key unreadable
                params = {'file_exists' : True}
                self.kwargs['params'] = params
                task_instance.xcom_push(key="file_exists", value='True')

                log.info('File Exists, returning True')

                return 'True'

        except FileNotFoundError:
            params = {'file_exists' : False}
            self.kwargs['params'] = params
            task_instance.xcom_push(key="file_exists", value='False')

            log.info('File Does not Exist, returning False')

            return 'False'


class CheckFilePlugin(AirflowPlugin):
    name = "check_file_exists"
    operators = [CheckFileExistsOperator]

Файл: airflow_dag_sample.py

import logging

from airflow import DAG
from check_file_exists_operator import CheckFileExistsOperator
from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import timedelta, datetime
from dateutil.relativedelta import relativedelta
from airflow.operators.email_operator import EmailOperator

log = logging.getLogger(__name__)
FORWARD_SLASH_LITERAL = '/'

default_args = {
    'owner': 'gvatreya',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'email': ['***@***.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=2),
    'timeout': 10,
    'sftp_conn_id': 'sftp_local_cluster',
    'provide_context': True
}

dag = DAG('my_test_dag',
          description='Another tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20),
          default_args=default_args,
          template_searchpath='/Users/your_name/some_path/airflow_home/sql',
          catchup=False)

template_filename_from_xcom = """
    {{ task_instance.xcom_pull(task_ids='get_fname_ships', key='file_to_be_processed', dag_id='my_test_dag') }}
"""

template_file_prefix_from_xcom = """
    {{ task_instance.xcom_pull(task_ids='get_fname_ships', key="month_prefix_for_file", dag_id='my_test_dag') }}
"""

t_check_file_exists = CheckFileExistsOperator(
    sftp_path_prefix='/toDjembe',
    file_to_be_processed=template_filename_from_xcom.strip(),
    sftp_conn_id='sftp_local_cluster',
    task_id='check_file_exists',
    dag=dag
)


def branch(**kwargs):
    file_exist = kwargs['task_instance'].xcom_pull(task_ids='get_fname_ships', key="file_exists",
                                                   dag_id='my_test_dag')
    print(template_filename_from_xcom)
    from IPython import embed; embed()
    log.debug("FILE_EXIST(from branch): %s", file_exist)
    if file_exist:
        return 's3_upload'
    else:
        return 'send_file_not_found_email'


t_branch_on_file_existence = BranchPythonOperator(
    task_id='branch_on_file_existence',
    python_callable=branch,
    dag=dag
)

t_send_file_not_found_email = EmailOperator(
    task_id='send_file_not_found_email',
    to='***@***.com',
    subject=template_email_subject.format(state='FAILURE',filename=template_filename_from_xcom.strip(),content='Not found on SFTP Server'),
    html_content='File Not Found in SFTP',
    mime_charset='utf-8',
    dag=dag
)

t_upload_to_s3 = SFTPToS3Operator(
    task_id='s3_upload',
    sftp_conn_id='sftp_local_cluster',
    sftp_path='/djembe/' + template_filename_from_xcom.strip(),
    s3_conn_id='s3_conn',
    s3_bucket='djembe-users',
    s3_key='gvatreya/experiment/' + template_file_prefix_from_xcom.strip() + FORWARD_SLASH_LITERAL + template_filename_from_xcom.strip(),
    dag=dag
)

t_check_file_exists >> t_branch_on_file_existence

t_branch_on_file_existence >> t_upload_to_s3
t_branch_on_file_existence >> t_send_file_not_found_email

Однако, когда я запускаю код,Оператор ветвления всегда видит строку «Нет».

Однако Xcom имеет значение true.

Я попытался отладить с помощью IPython embed() и вижу, что в экземпляре задачи не содержится значение xcom.Я пытался использовать params и другие вещи, о которых мог подумать, но безрезультатно.

Потратив на это несколько дней, я начинаю думать, что упустил что-то решающее в XCom в Airflow.

Надеясь, что кто-нибудь может помочь.

Заранее спасибо.

1 Ответ

2 голосов
/ 10 апреля 2019

Я думаю, проблема в зависимости.

В настоящее время у вас есть следующее:

sample_task >> task_3

task_2 >> task_3
task_2 >> task_4

Измените его следующим образом, добавив строку sample_task >> tasK_2.

sample_task >> task_3
sample_task >> tasK_2

task_2 >> task_3
task_2 >> task_4

Ваша задача, которая передает xcom, должна быть запущена первой, прежде чем задача, использующая BranchPythonOperator

Во втором примере функция branch использует xcom_pull(task_ids='get_fname_ships', но я не могу найти задачу сget_fname_ships task_id.

...