Как записать в локальный путь к файлу в Airflow - MacOS - PullRequest
1 голос
/ 21 января 2020

Я пишу конвейер Airflow, который включает запись результатов в файл csv, расположенный в моей локальной файловой системе.

Я использую MacOS, и путь к файлу похож на / User / name / file_path / file_name .csv)

Вот мой код:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import os
from airflow.operators.python_operator import PythonOperator
#Import boto3 module
import boto3
import logging
from botocore.exceptions import ClientError
import csv
import numpy as np
import pandas as pd

bucket='my_bucket_name'

s3 = boto3.resource('s3',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
    )


def load_into_csv(years):
    df = pd.DataFrame()
    for year in years:
        for buckett in s3.buckets.all():
            for aobj in buckett.objects.filter(Bucket=bucket,Prefix=PREFIX):
                if year in aobj.key:
                    bucket_name= "'{}'  ".format(buckett.name)
                    the_key= "'{}'  ".format(aobj.key)
                    last_mod= "'{}'  ".format(aobj.last_modified)
                    stor_class= "'{}'  ".format(aobj.storage_class)
                    size_1= "'{}'  ".format(aobj.size)
                    dd = {'bucket_name':[bucket_name], 'S3_key_path':[the_key], 'last_modified_date':[last_mod], 'storage_class':[stor_class], 'size':[size_1] }
                    df_2 = pd.DataFrame(data=dd)
                    df = df.append(df_2, ignore_index=True)

                    #Get local directory 
                    path=os.getcwd()

                    export_csv = df.to_csv (r'{}/results.csv'.format(path) ,index = None, header=True)


load_into_csv(years)


#######################################################################################################################

default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2020,1,1),
    'email': ['email@aol.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}


dag = DAG('bo_v1',
          description = 'this is a test script',
          default_args=default_args,
          schedule_interval= '@once',
          catchup = False )


years=['2017','2018','2019']

for year in years:
    t1 = PythonOperator(
        task_id='load 2017',
        python_callable= load_into_csv,
        provide_context=False,
        dag = dag)

Если вы посмотрите на переменную path, я попытаюсь собрать локальный путь os и затем установить его в качестве выходного файла в csv экспорта переменная, но безрезультатно.

Есть ли способ указать путь к локальному файлу MacOS (/Users/name/path/file_name.csv) в качестве пути к файлу в переменной export_to_csv? Я новичок в Airflow, поэтому любые идеи или предложения помогут !!!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...