Шаблонирование `bucket_key` в S3KeySensor в Apache Airflow - PullRequest
0 голосов
/ 24 мая 2018

Версия воздушного потока: 1.9.0

В файле dag воздушного потока у меня есть задача PythonOperator с именем run_query, которая задает следующую переменную xcom в своей функции python_callable:

kwargs['ti'].xcom_push(key='query_result_loc', value=query_result_loc)

В последнем том же dag у меня есть задача S3KeySensor, которая использует вышеуказанное расположение для своего параметра bucket_key:

S3KeySensor(task_id = 'check_file_in_s3',
                 bucket_key = '{{  ti.xcom_pull(task_ids="run_query",key="query_result_loc")  }}' ,
                 bucket_name = None,
                 wildcard_match = False,
                 poke_interval=60,
                 timeout=1200,
                 dag = dag
                 )

Теперь, когда я запускаю dag (либо в тестовом режиме, либо в режиме trigger_dag), S3KeySensor жалуется на отсутствующее bucket_name, которое исходит из этого кода в S3KeySensor определении :

    class S3KeySensor(BaseSensorOperator):
    """
    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
    S3 being a key/value it does not support folders. The path is just a key
    a resource.

    :param bucket_key: The key being waited on. Supports full s3:// style url
        or relative path from root level.
    :type bucket_key: str
    :param bucket_name: Name of the S3 bucket
    :type bucket_name: str
    :param wildcard_match: whether the bucket_key should be interpreted as a
        Unix wildcard pattern
    :type wildcard_match: bool
    :param aws_conn_id: a reference to the s3 connection
    :type aws_conn_id: str
    """
    template_fields = ('bucket_key', 'bucket_name')

    @apply_defaults
    def __init__(
                self, bucket_key,
                bucket_name=None,
                wildcard_match=False,
                aws_conn_id='aws_default',
                *args, **kwargs):
            super(S3KeySensor, self).__init__(*args, **kwargs)
            # Parse
            if bucket_name is None:
                parsed_url = urlparse(bucket_key)
                if parsed_url.netloc == '':
                    raise AirflowException('Please provide a bucket_name')
                else:
                    bucket_name = parsed_url.netloc
                    if parsed_url.path[0] == '/':
                        bucket_key = parsed_url.path[1:]
                    else:
                        bucket_key = parsed_url.path
            self.bucket_name = bucket_name
            self.bucket_key = bucket_key

Похоже, что шаблон не обрабатывается при этомэтап.

Если я закомментирую, что если блок, он работает нормально.Является ли это ошибкой или неправильным использованием полей шаблона?

Обновление, основанное на комментарии @ kaxil:

  • Без имени bucket_name и с блоком 'if', оставленным без комментариев, airflowне может даже обнаружить дага.В пользовательском интерфейсе вместо этого я вижу эту ошибку: Broken DAG: [/XXXX/YYYY/project_airflow.py] Please provide provide a bucket_name
  • Без указания bucket_name, но со следующим изменением в блоке if (см. Удаление проверки if parsed_url.netloc == ''), онаотлично работает:

    if bucket_name is None:
        parsed_url = urlparse(bucket_key)
        bucket_name = parsed_url.netloc
        if parsed_url.path[0] == '/':
            bucket_key = parsed_url.path[1:]
        else:
            bucket_key = parsed_url.path
    
  • При наличии имени bucket_name он отлично работает с отображаемым значением для bucket_key и bucket_name на вкладке Rendered Template.

1 Ответ

0 голосов
/ 06 ноября 2018

Вы, наверное, уже решили это, но это потому, что вы не предоставляете массив для task_ids.Формат должен быть task_ids=["run_query"].Изменение на это решило проблему для меня.

...