IndexError: строковый индекс вне диапазона (Airflow & Python) - PullRequest
0 голосов
/ 03 октября 2019

Я создаю процесс передачи информации из BigQuery в облачное хранилище через Airflow. Я использую циклическую функцию, которая создает различные задачи, и идея состоит в том, чтобы передавать информацию из всех имеющихся у нас таблиц, которая будет появляться автоматически. Но, к сожалению, я столкнулся с еще одной ошибкой кодирования в Python.

Я посмотрел на несколько похожих вопросов, но, честно говоря, не смог связать его с моим кодом и заставить его работать должным образом. Я знаю, что есть данные в переменных "lista_tabelas" и "qtd_tabelas". Что может быть изменено в коде, чтобы процесс работал должным образом.

def get_data_from_bigquery():
    bq = bigquery.Client()
    query = """ SELECT dataset_id, table_id, CONCAT(dataset_id,'.',table_id) as dataset_table FROM dataset.__TABLES__  """
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    qtd_tabelas = len(rows)
    Variable.set("lista_tabelas", rows)
    Variable.set("qtd_tabelas", int(qtd_tabelas))

def prepare_backup():
    a = Variable.get("lista_tabelas")
    a_len = Variable.get("qtd_tabelas")
    for i in range(int(a_len)):
        #print("Start - Dataset: %s, Table: %s, DataSet.Table: %s" %(a[i][0],a[i][1],a[i][2]))
        bq_to_gcp_json = BigQueryToCloudStorageOperator(
            task_id='bq_to_gcs_json',
            dag=dag,
            source_project_dataset_table='dataset_test:'+a[i][2],
            destination_cloud_storage_uris=['gs://'+nm_bucket+'/backup/'+a[i][0]+'/'+{{ ts_nodash }}+'/'+a[i][1]+'_*.json'],
            export_format='NEWLINE_DELIMITED_JSON',
            bigquery_conn_id='gcp')
        bq_to_gcp_json.set_upstream(Python_2)

Python_1 = PythonOperator(task_id='bq_list_tables',
    python_callable=get_data_from_bigquery,
    dag=dag)

Python_2 = PythonOperator(task_id='backup_all_tables',
    python_callable=prepare_backup,
    dag=dag)

Python_2.set_upstream(Python_1)

Сообщение об ошибке:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/airflow/dags/git/backup_bq_to_gcs.py", line 133, in prepare_backup
    source_project_dataset_table='dataset_test:'+a[i][2],
IndexError: string index out of range
...