Я пишу конвейер Airflow, который включает запись результатов в файл csv, расположенный в моей локальной файловой системе.
Я использую MacOS, и путь к файлу похож на / User / name / file_path / file_name .csv)
Вот мой код:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import os
from airflow.operators.python_operator import PythonOperator
#Import boto3 module
import boto3
import logging
from botocore.exceptions import ClientError
import csv
import numpy as np
import pandas as pd
bucket='my_bucket_name'
s3 = boto3.resource('s3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)
def load_into_csv(years):
df = pd.DataFrame()
for year in years:
for buckett in s3.buckets.all():
for aobj in buckett.objects.filter(Bucket=bucket,Prefix=PREFIX):
if year in aobj.key:
bucket_name= "'{}' ".format(buckett.name)
the_key= "'{}' ".format(aobj.key)
last_mod= "'{}' ".format(aobj.last_modified)
stor_class= "'{}' ".format(aobj.storage_class)
size_1= "'{}' ".format(aobj.size)
dd = {'bucket_name':[bucket_name], 'S3_key_path':[the_key], 'last_modified_date':[last_mod], 'storage_class':[stor_class], 'size':[size_1] }
df_2 = pd.DataFrame(data=dd)
df = df.append(df_2, ignore_index=True)
#Get local directory
path=os.getcwd()
export_csv = df.to_csv (r'{}/results.csv'.format(path) ,index = None, header=True)
load_into_csv(years)
#######################################################################################################################
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2020,1,1),
'email': ['email@aol.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('bo_v1',
description = 'this is a test script',
default_args=default_args,
schedule_interval= '@once',
catchup = False )
years=['2017','2018','2019']
for year in years:
t1 = PythonOperator(
task_id='load 2017',
python_callable= load_into_csv,
provide_context=False,
dag = dag)
Если вы посмотрите на переменную path, я попытаюсь собрать локальный путь os и затем установить его в качестве выходного файла в csv экспорта переменная, но безрезультатно.
Есть ли способ указать путь к локальному файлу MacOS (/Users/name/path/file_name.csv) в качестве пути к файлу в переменной export_to_csv? Я новичок в Airflow, поэтому любые идеи или предложения помогут !!!