Я устанавливаю несколько конвейеров ETL на Googles Composer Airflow, развернутых на GKE с 3 узлами. Минимум для Airflow Compose из GCP!
Версия: 1.10.1-composer
GCP Версия изображения: composer-1.6.0-airflow-1.10.1
Я быобычно входите в систему airflow и пытайтесь отлаживать через Ipython, но это сложно сделать при настройке GKE. Кажется, я не могу найти подходящее место для запуска интерактивных тестов для отладки.
Оператор Python: использование стандартного состояния GSheetHook
def pull_sheet(execution_date=None):
hook = GSheetHook()
sheet_data = hook.get_values_df('SHEET_ID_XXXXX',
'EXAMPLEXXXXX!A1:J4305', shape_column=None)
return print(sheet_data)
STALE в журналах воздушных потоков. Задание было оставлено на один день (24 часа) без ошибки тайм-аута или какой-либо ошибки вообще, никогда не отмечалось для повторной попытки. Приведенный ниже фрагмент и скриншот журналов планировщика - единственная информация, которая у меня есть о запущенных задачах. Из журналов планировщика похоже, что задача продолжает выполняться без подтверждения каких-либо изменений состояния ... Журналы воздушного потока
[2019-10-21 13:15:07,431] {models.py:1361} INFO - Dependencies all met for <TaskInstance: gsheet_test.pull_gsheet 2019-10-20T02:00:00+00:00 [queued]>
[2019-10-21 13:15:07,441] {models.py:1361} INFO - Dependencies all met for <TaskInstance: gsheet_test.pull_gsheet 2019-10-20T02:00:00+00:00 [queued]>
[2019-10-21 13:15:07,442] {models.py:1573} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of
-------------------------------------------------------------------------------
[2019-10-21 13:15:07,490] {models.py:1595} INFO - Executing <Task(PythonOperator): pull_gsheet> on 2019-10-20T02:00:00+00:00
[2019-10-21 13:15:07,491] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run gsheet_test pull_gsheet 2019-10-20T02:00:00+00:00 --job_id 70970 --raw -sd DAGS_FOLDER/gsheet_test.py --cfg_path /tmp/tmp3xukhrnx']
Любая помощь приветствуется !!