Я создаю процесс передачи информации из BigQuery в облачное хранилище через Airflow. Я использую циклическую функцию, которая создает различные задачи, и идея состоит в том, чтобы передавать информацию из всех имеющихся у нас таблиц, которая будет появляться автоматически. Но, к сожалению, я столкнулся с еще одной ошибкой кодирования в Python.
Я посмотрел на несколько похожих вопросов, но, честно говоря, не смог связать его с моим кодом и заставить его работать должным образом. Я знаю, что есть данные в переменных "lista_tabelas" и "qtd_tabelas". Что может быть изменено в коде, чтобы процесс работал должным образом.
def get_data_from_bigquery():
bq = bigquery.Client()
query = """ SELECT dataset_id, table_id, CONCAT(dataset_id,'.',table_id) as dataset_table FROM dataset.__TABLES__ """
query_job = bq.query(query)
data = query_job.result()
rows = list(data)
qtd_tabelas = len(rows)
Variable.set("lista_tabelas", rows)
Variable.set("qtd_tabelas", int(qtd_tabelas))
def prepare_backup():
a = Variable.get("lista_tabelas")
a_len = Variable.get("qtd_tabelas")
for i in range(int(a_len)):
#print("Start - Dataset: %s, Table: %s, DataSet.Table: %s" %(a[i][0],a[i][1],a[i][2]))
bq_to_gcp_json = BigQueryToCloudStorageOperator(
task_id='bq_to_gcs_json',
dag=dag,
source_project_dataset_table='dataset_test:'+a[i][2],
destination_cloud_storage_uris=['gs://'+nm_bucket+'/backup/'+a[i][0]+'/'+{{ ts_nodash }}+'/'+a[i][1]+'_*.json'],
export_format='NEWLINE_DELIMITED_JSON',
bigquery_conn_id='gcp')
bq_to_gcp_json.set_upstream(Python_2)
Python_1 = PythonOperator(task_id='bq_list_tables',
python_callable=get_data_from_bigquery,
dag=dag)
Python_2 = PythonOperator(task_id='backup_all_tables',
python_callable=prepare_backup,
dag=dag)
Python_2.set_upstream(Python_1)
Сообщение об ошибке:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/airflow/dags/git/backup_bq_to_gcs.py", line 133, in prepare_backup
source_project_dataset_table='dataset_test:'+a[i][2],
IndexError: string index out of range