Воздушный поток с Kubernetes Executor - Ошибка: Файловая система только для чтения: '/airflow/dags/git/test.csv - PullRequest
0 голосов
/ 15 октября 2019

Я пытаюсь записать результаты пересечения таблиц в файл CSV с помощью воздушного потока (который установлен внутри Kubernetes), но я получаю сообщение о том, что файл CSV является файлом чтения. Есть ли какой-либо параметр, который я могу изменить в сценарии, который заставляет его писать результат?

def update_table():
    # Connecting to BigQuery
    client = bigquery.Client()
    query = """SELECT ltrim(rtrim(col1)) as col1,
                 sum(col2) as col2
                 FROM dataset.table
                 GROUP BY 1
                 ORDER BY col1 desc """
    job = client.query(query)
    df_tst = job.to_dataframe()

    # Connecting to BigQuery 
    query_mer = """SELECT distinct col1 FROM dataset.table2 """
    mer_job = client.query(query_mer)
    df_mer = mer_job.to_dataframe()

    # Comparing both tables

    nomes = df_tst.col1.tolist()
    #categorizacao_merchants
    nomes_mer = df_merchants.col1.tolist()
    lista = list(np.setdiff1d(nomes, nomes_mer))

    for x in lista:
        with open('/airflow/dags/git/test.csv','a', newline='') as f:
            writer = csv.writer(f, delimiter=';')
            writer.writerow([x])
            f.close()

with DAG('update_cat', default_args=default_args, description='Python DAG', schedule_interval='0 0 * * 0', start_date=airflow.utils.dates.days_ago(0), catchup=False) as dag:
        python_task = PythonOperator(task_id='python_task', python_callable=update_table, dag=dag)
...