Вам не обязательно использовать облачную функцию для определения файла в GCS, Composer имеет датчики GCS, которые можно использовать для достижения цели.
Предположим, вам необходимо отслеживать файлы в корзине / папке. /file_*.csv затем:
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger