Ошибка задания воздушного потока Глобальная переменная не распознана - PullRequest
0 голосов
/ 02 марта 2020

У меня есть следующее:

def fetch_sf_data():
    response1 = requests.get("https://company.my.salesforce.com/"+ReportID1+"?export=1&enc=UTF-8&xf=csv",
                  headers = sf.headers, cookies = {'sid' : sid})
    global salesforce_report
    salesforce_report_raw = pd.read_csv(io.StringIO(response1.text))
    salesforce_report = salesforce_report_raw[:-5]

def push_to_sql(salesforce_report):
    salesforce_report.to_sql('Daily_Report_SF',engine,if_exists='replace' ,index=False)

t1 = PythonOperator(
    task_id='fetch_sf_data',
    python_callable=fetch_sf_data,
    dag = dag 
)

t2 = PythonOperator(
    task_id='push_to_sql',
    python_callable=push_to_sql,
    dag=dag
)

t1 >> t2

Задача 1 выполняется без сбоев, но задача 2 не выполняется и возвращается следующий код ошибки:

TypeError: push_to_sql() missing 1 required positional argument: 'salesforce_report'

У меня сложилось впечатление, что, поскольку я объявил 'salesforce_report' как глобальную переменную, которую он без проблем передаст следующей задаче Разве это не тот случай, когда я использую воздушный поток? Что я должен сделать, чтобы Задача 2 не провалилась?

Спасибо за помощь!

1 Ответ

0 голосов
/ 02 марта 2020

Это не сработает. Это связано с тем, что в Airflow каждая задача может работать на другом компьютере, поэтому вы должны сохранить результат либо в Xcom , либо в S3 / GCS, либо в локальной файловой системе. А затем прочитайте этот файл в вашем следующем задании и наберите от sh до SQL.

...