Журналы Airflow нигде не найдены - PullRequest
0 голосов
/ 03 мая 2020

Я использую простой DAG, который должен загрузить пустой файл в корзину s3. Этот DAG работает, когда я запускаю airflow test и успешно загружает файл.

DAG сообщает, что он успешно запущен, но у меня нет файла в S3, но я также нигде не могу найти журналы, я не понимаю.

Я использую Celery Executor и имею следующие узлы.

Вопрос:

Как отладить тот факт, что файл не загружается? У меня только 1 папка лога для планировщика и содержимое пустое. Я не изменил никаких настроек по умолчанию.

Запуск Docker Контейнеры

CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS                    PORTS                                        NAMES
2eaa143b53da        puckel/docker-airflow:1.10.9   "/entrypoint.sh work…"   12 minutes ago      Up 12 minutes             5555/tcp, 8080/tcp, 8793/tcp                 dockerairflow_worker_1
c370b716db11        puckel/docker-airflow:1.10.9   "/entrypoint.sh sche…"   12 minutes ago      Up 12 minutes             5555/tcp, 8080/tcp, 8793/tcp                 dockerairflow_scheduler_1
b59cfcc06271        puckel/docker-airflow:1.10.9   "/entrypoint.sh webs…"   12 minutes ago      Up 12 minutes (healthy)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   dockerairflow_webserver_1
1f8afc27859f        puckel/docker-airflow:1.10.9   "/entrypoint.sh flow…"   12 minutes ago      Up 12 minutes             8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp   dockerairflow_flower_1
38674be37883        redis:5.0.5                    "docker-entrypoint.s…"   12 minutes ago      Up 12 minutes             6379/tcp                                     dockerairflow_redis_1
17f7ab0cabe2        postgres:9.6                   "docker-entrypoint.s…"   12 minutes ago      Up 12 minutes             5432/tcp                                     dockerairflow_postgres_1

Код DAG

from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, date
import boto3

def upload_file():
    s3 = boto3.resource('s3')
    BUCKET = "test_bucket_stackoverflow"
    open('my_file.txt', 'a').close()
    s3.Bucket(BUCKET).upload_file("my_file.txt", "dump/" + get_file_name() + "my_file.txt")

def get_file_name():
    today = date.today()
    return str(today.strftime("%d-%m-%Y-"))

default_args = {
            "owner": "airflow",
            "start_date": datetime.today(),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "",
            "retries": 1,
            "retry_delay": timedelta(minutes=5)
        }

with DAG(dag_id="s3_file_tester", schedule_interval=None, default_args=default_args, catchup=False) as dag:
    python_task = PythonOperator(task_id='s3_file_uploader', python_callable=upload_file)
...