Версия воздушного потока: 1.9.0
В файле dag воздушного потока у меня есть задача PythonOperator
с именем run_query
, которая задает следующую переменную xcom в своей функции python_callable:
kwargs['ti'].xcom_push(key='query_result_loc', value=query_result_loc)
В последнем том же dag у меня есть задача S3KeySensor
, которая использует вышеуказанное расположение для своего параметра bucket_key:
S3KeySensor(task_id = 'check_file_in_s3',
bucket_key = '{{ ti.xcom_pull(task_ids="run_query",key="query_result_loc") }}' ,
bucket_name = None,
wildcard_match = False,
poke_interval=60,
timeout=1200,
dag = dag
)
Теперь, когда я запускаю dag (либо в тестовом режиме, либо в режиме trigger_dag), S3KeySensor жалуется на отсутствующее bucket_name, которое исходит из этого кода в S3KeySensor определении :
class S3KeySensor(BaseSensorOperator):
"""
Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
S3 being a key/value it does not support folders. The path is just a key
a resource.
:param bucket_key: The key being waited on. Supports full s3:// style url
or relative path from root level.
:type bucket_key: str
:param bucket_name: Name of the S3 bucket
:type bucket_name: str
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
:type wildcard_match: bool
:param aws_conn_id: a reference to the s3 connection
:type aws_conn_id: str
"""
template_fields = ('bucket_key', 'bucket_name')
@apply_defaults
def __init__(
self, bucket_key,
bucket_name=None,
wildcard_match=False,
aws_conn_id='aws_default',
*args, **kwargs):
super(S3KeySensor, self).__init__(*args, **kwargs)
# Parse
if bucket_name is None:
parsed_url = urlparse(bucket_key)
if parsed_url.netloc == '':
raise AirflowException('Please provide a bucket_name')
else:
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
self.bucket_name = bucket_name
self.bucket_key = bucket_key
Похоже, что шаблон не обрабатывается при этомэтап.
Если я закомментирую, что если блок, он работает нормально.Является ли это ошибкой или неправильным использованием полей шаблона?
Обновление, основанное на комментарии @ kaxil:
- Без имени bucket_name и с блоком 'if', оставленным без комментариев, airflowне может даже обнаружить дага.В пользовательском интерфейсе вместо этого я вижу эту ошибку:
Broken DAG: [/XXXX/YYYY/project_airflow.py] Please provide provide a bucket_name
Без указания bucket_name, но со следующим изменением в блоке if (см. Удаление проверки if parsed_url.netloc == ''
), онаотлично работает:
if bucket_name is None:
parsed_url = urlparse(bucket_key)
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
При наличии имени bucket_name он отлично работает с отображаемым значением для bucket_key
и bucket_name
на вкладке Rendered Template
.