Я создал DAG, который записывается так:
from datetime import datetime as dt, timedelta, date
from airflow import models, DAG
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator, DataProcPySparkOperator, \
DataprocClusterDeleteOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
current = date.today()
yesterday = str(current - timedelta(days=1))
BUCKET = "gs://r_etl"
PYSPARK_JOB = BUCKET + "/spark_job/reddit-spark.py"
REDDIT_JOB = BUCKET + "/reddit_job/reddit_daily_load.py"
# The above two variables are examples of env variables that can be extracted by Variable
DEFAULT_DAG_ARGS = \
{
'owner': "airflow",
'depends_on_past': False,
"start_date": dt(2020, 6, 19),
"email_on_retry": False,
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"project_id": "reddit-etl"
}
with DAG("reddit_submission_etl", default_args=DEFAULT_DAG_ARGS, catchup=False, schedule_interval="0 0 * * *") as dag:
create_cluster = DataprocClusterCreateOperator(
task_id ="create_dataproc_cluster",
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
master_machine_type="n1-standard-1",
worker_machine_type="n1-standard-1",
num_workers=2,
region="us-east1",
zone="us-east1-b",
metadata='PIP_PACKAGES=pandas praw google-cloud-storage',
init_actions_uris='gs://goog-dataproc-initialization-actions-us-east1/python/pip-install.sh'
)
submit_reddit = DataProcPySparkOperator(
task_id="run_reddit_etl",
main=REDDIT_JOB,
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
region="us-east1"
)
bq_load_submissions = GoogleCloudStorageToBigQueryOperator(
task_id="bq_load_submissions",
bucket="r_etl",
source_objects=["submissions_store/" + yesterday + "*"],
destination_project_dataset_table="reddit-etl.data_analysis.submissions",
autodetect=True,
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0
)
submit_pyspark = DataProcPySparkOperator(
task_id="run_pyspark_etl",
main=PYSPARK_JOB,
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
region="us-east1"
)
bq_load_analysis = GoogleCloudStorageToBigQueryOperator(
task_id="bq_load_analysis",
bucket="r_etl",
source_objects=["spark_results/" + yesterday + "_calculations/part-*"],
destination_project_dataset_table="reddit-etl.data_analysis.submission_analysis",
autodetect=True,
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0
)
delete_cluster = DataprocClusterDeleteOperator(
task_id="delete_dataproc_cluster",
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
region="us-east1",
trigger_rule=TriggerRule.ALL_DONE
)
create_cluster.dag = dag
create_cluster.set_downstream(submit_reddit)
submit_reddit.set_downstream(bq_load_submissions)
bq_load_submissions.set_downstream(submit_pyspark)
submit_pyspark.set_downstream(bq_load_analysis)
bq_load_analysis.set_downstream(delete_cluster)
Когда я помещаю этот DAG в Airflow и пытаюсь запустить его, он возвращает эту ошибку в журналах:
[2020-06-19 08:51:40,397] {taskinstance.py:1059} ERROR - <HttpError 400 when requesting https://dataproc.googleapis.com/v1beta2/projects/reddit-etl/regions/us-east1/clusters?requestId=42674e7e-6b08-4829-9d7f-193a04e29888&alt=json returned "Invalid value at 'cluster.config.gce_cluster_config.metadata' (type.googleapis.com/google.cloud.dataproc.v1beta2.GceClusterConfig.MetadataEntry), "PIP_PACKAGES=pandas praw google-cloud-storage"">
Итак, для моего проекта мне нужно установить pandas, PRAW и облачное хранилище Google, чтобы правильно запустить первую задачу после создания кластера. Я заранее создал другой кластер, который установил пакеты и запустил через него шаблон рабочего процесса, который действительно работал:
REGION="us-east1"
gcloud dataproc clusters create spark-dwh \
--scopes=default \
--region "us-east1" --zone "us-east1-b" \
--master-machine-type n1-standard-2 \
--master-boot-disk-size 200 \
--num-workers 2 \
--worker-machine-type n1-standard-2 \
--worker-boot-disk-size 200 \
--metadata 'PIP_PACKAGES=pandas praw google-cloud-storage' \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/python/pip-install.sh \
--image-version 1.4
Это то, на чем я основал действие инициализации и метаданные для DAG. Возможна ли установка таких библиотек python таким образом, или мне нужно использовать уже существующий кластер?