DAG не принимает параметризованное сообщение - PullRequest
1 голос
/ 11 июня 2019

У меня есть группа обеспечения доступности баз данных, которая должна выполнить оператор Python и передать полученную строку в виде сообщения оператору PubsubPublish.

Мой код ниже печатает сообщения отлично, но когда я загружаю этот DAG в airflow, он не загружается.Я думаю, что это структура моей группы доступности базы данных, и оператор pubsubpublish не может прочитать параметр 'messages'

Я пытался использовать сообщение в качестве шаблонного поля, но это тоже не помогло.

def download_yaml():
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)
content_blob=blob.download_as_string()
encoded_string = base64.b64encode(content_blob)
return encoded_string

encoded_string = download_yaml()
messages = [
     {'data': b64encode(encoded_string)},

] 
print messages
dag= DAG('pubsub-message-docker', default_args=default_args,schedule_interval=timedelta(days=1))
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages', messages=messages,dag=dag)

t1= PythonOperator(task_id='download_yaml_as_string',provide_context=True,python_callable=download_yaml,dag=dag)

t1.set_downstream(t2)

Я могу напечатать 'закодированную строку', однако мне нужно передать encoded_string как сообщение в моем операторе pubsubpublish, чтобы это было опубликовано.

Ответы [ 3 ]

0 голосов
/ 11 июня 2019

Вот два момента для вашего рассмотрения. 1. Для обмена информацией между операторами dag Xcom должен быть более официальным способом.

XComs позволяет задачам обмениваться сообщениями, предоставляя больше нюансов контроля и общего состояния. Название является аббревиатурой от «Кросс-связь». ...... Любой объект, который может использоваться как значение XCom, поэтому пользователи должны убедиться, что использовать объекты соответствующего размера.

XComs можно «толкнуть» (отправить) или «вытащить» (получить). .....

Задачи вызывают xcom_pull () для получения XComs, при необходимости применяя фильтры. на основе таких критериев, как ключ, исходные значения задачи и исходные значения dag_id. ......

https://airflow.apache.org/concepts.html#xcoms

  1. ваш файл python может запускаться и получать неизвестный результат, поскольку сообщения не связаны с задачей t1. Он просто инициализируется в начале функцией download_yml. Хотя t1 вызовите download_yml снова, но сообщения не меняются. Таким образом, T2 получает сообщения только с начальным значением. Чтобы решить эту проблему, вы должны отправить сообщения в t1 в Xcom, а также извлечь сообщения в t2 из Xcom.

Удачи.

WangYong

0 голосов
/ 18 июня 2019

Наконец-то я смог это решить :) Сообщение должно быть передано оператору pubsubpublish, как показано ниже:

сообщения = { 'данные' :( "{{task_instance.xcom_pull (ключ = 'encoded_string', 'task_ids download_yaml' =)}}")}

, а затем передать сообщения в сообщения параметров. t2 = PubSubPublishOperator (проект = проект, тема = тема, task_id = 'publish-messages', messages = messages, dag = dag)

С наилучшими пожеланиями Sakshi

0 голосов
/ 11 июня 2019

Если вы видите DAG в пользовательском интерфейсе, но получаете сообщение об ошибке, о которой упоминали в своем комментарии (главный планировщик не знает о его существовании), то я бы посоветовал сначала взглянуть на планировщик! Убедитесь, что у планировщика есть доступ к вашим группам DAG, и попробуйте перезапустить планировщик. Если у вас есть эта ошибка ... вы увидите DAG в веб-интерфейсе, но не сможете запустить ее или просмотреть ее журналы.

Что касается вашего другого вопроса относительно передачи сообщений в PubSubPublishOperator, я считаю, что он должен работать нормально!

...