Я пытаюсь выполнить то, что кажется простой задачей, но пока не могу заставить ее работать. Я хочу использовать Cloud Composer для сбора данных из базы данных SQL и сохранить их в GCS. У меня возникают проблемы с разрешениями
Вот мой DAG:
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
from airflow import models
import datetime
export_body = {
"exportContext": {
"kind": "sql#exportContext",
"fileType": "csv",
"uri": "gs://mybucket/export_sql.csv",
"csvExportOptions": {
"selectQuery": "select count(*) as number from some_table"
}
}
}
yesterday = datetime.datetime.combine(
datetime.datetime.today(),
datetime.datetime.min.time())
start_date = yesterday
JOB_NAME = "job_name"
default_args = {
'start_date': start_date,
}
with models.DAG(JOB_NAME,
schedule_interval="@hourly",
default_args=default_args) as dag:
sql_export_task = CloudSqlInstanceExportOperator(body=export_body,
project_id="project_id",
instance='instance',
task_id='sql_export_task')
sql_export_task
Я создал специальную учетную запись службы, выполняющую несколько ролей: Cloud SQL Admin Composer Worker Storage Object Creator
Когда я создаю среду, я указываю эту учетную запись, а затем загружаю вышеуказанную группу обеспечения доступности баз данных в соответствующую корзину.
Я получаю эту ошибку:
"error":
"code": 403
"message": "The service account does not have the required permissions for the bucket."
"errors":
"message": "The service account does not have the required permissions for the bucket."
"domain": "global"
"reason": "notAuthorized
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 643, in execut
body=self.body
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 247, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_sql_hook.py", line 310, in export_instanc
'Exporting instance {} failed: {}'.format(instance, ex.content
AirflowException: Exporting instance prod failed:
"error":
"code": 403
"message": "The service account does not have the required permissions for the bucket."
"errors":
"message": "The service account does not have the required permissions for the bucket."
"domain": "global"
"reason": "notAuthorized
Я думал, что роль создателя хранилища должна датьмне разрешение. Должен ли я добавить другую роль в учетную запись службы? который из? Любой совет или решение о том, как действовать, будет наиболее ценно. Спасибо!
РЕДАКТИРОВАТЬ: я добавил роль администратора хранилища, и это удалило эту ошибку. Однако, несмотря на то, что моя группа доступности баз данных не работает.
Интерфейс воздушного потока посылает смешанные сигналы: у задачи нет статуса:
Но это как-то успех?
Я проверил свое ведро, отсутствует файл CSV, который, как я надеялся, будет создан.
Любой совет или решение о том, как действоватьбудет наиболее ценится. Спасибо!