Я использую простой DAG, который должен загрузить пустой файл в корзину s3. Этот DAG работает, когда я запускаю airflow test
и успешно загружает файл.
DAG сообщает, что он успешно запущен, но у меня нет файла в S3, но я также нигде не могу найти журналы, я не понимаю.
Я использую Celery Executor и имею следующие узлы.
Вопрос:
Как отладить тот факт, что файл не загружается? У меня только 1 папка лога для планировщика и содержимое пустое. Я не изменил никаких настроек по умолчанию.
Запуск Docker Контейнеры
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2eaa143b53da puckel/docker-airflow:1.10.9 "/entrypoint.sh work…" 12 minutes ago Up 12 minutes 5555/tcp, 8080/tcp, 8793/tcp dockerairflow_worker_1
c370b716db11 puckel/docker-airflow:1.10.9 "/entrypoint.sh sche…" 12 minutes ago Up 12 minutes 5555/tcp, 8080/tcp, 8793/tcp dockerairflow_scheduler_1
b59cfcc06271 puckel/docker-airflow:1.10.9 "/entrypoint.sh webs…" 12 minutes ago Up 12 minutes (healthy) 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp dockerairflow_webserver_1
1f8afc27859f puckel/docker-airflow:1.10.9 "/entrypoint.sh flow…" 12 minutes ago Up 12 minutes 8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp dockerairflow_flower_1
38674be37883 redis:5.0.5 "docker-entrypoint.s…" 12 minutes ago Up 12 minutes 6379/tcp dockerairflow_redis_1
17f7ab0cabe2 postgres:9.6 "docker-entrypoint.s…" 12 minutes ago Up 12 minutes 5432/tcp dockerairflow_postgres_1
Код DAG
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, date
import boto3
def upload_file():
s3 = boto3.resource('s3')
BUCKET = "test_bucket_stackoverflow"
open('my_file.txt', 'a').close()
s3.Bucket(BUCKET).upload_file("my_file.txt", "dump/" + get_file_name() + "my_file.txt")
def get_file_name():
today = date.today()
return str(today.strftime("%d-%m-%Y-"))
default_args = {
"owner": "airflow",
"start_date": datetime.today(),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "",
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
with DAG(dag_id="s3_file_tester", schedule_interval=None, default_args=default_args, catchup=False) as dag:
python_task = PythonOperator(task_id='s3_file_uploader', python_callable=upload_file)