У меня есть группа обеспечения доступности баз данных, которая должна выполнить оператор Python и передать полученную строку в виде сообщения оператору PubsubPublish.
Мой код ниже печатает сообщения отлично, но когда я загружаю этот DAG в airflow, он не загружается.Я думаю, что это структура моей группы доступности базы данных, и оператор pubsubpublish не может прочитать параметр 'messages'
Я пытался использовать сообщение в качестве шаблонного поля, но это тоже не помогло.
def download_yaml():
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)
content_blob=blob.download_as_string()
encoded_string = base64.b64encode(content_blob)
return encoded_string
encoded_string = download_yaml()
messages = [
{'data': b64encode(encoded_string)},
]
print messages
dag= DAG('pubsub-message-docker', default_args=default_args,schedule_interval=timedelta(days=1))
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages', messages=messages,dag=dag)
t1= PythonOperator(task_id='download_yaml_as_string',provide_context=True,python_callable=download_yaml,dag=dag)
t1.set_downstream(t2)
Я могу напечатать 'закодированную строку', однако мне нужно передать encoded_string как сообщение в моем операторе pubsubpublish, чтобы это было опубликовано.