Datapro c перезаписывает память исполнителя - PullRequest
1 голос
/ 23 марта 2020

Раньше мы запускали искровое задание в кластере Had oop со следующими аргументами:

{
    'conn_id': 'spark_default',
    'num_executors': 10,
    'executor_cores': 4,
    'executor_memory': '15G',
    'driver_memory': '8G',
    'conf': {
            'spark.yarn.executor.memoryOverhead': '10G'
        }
}

Сейчас мы перемещаем задания в Datapro c, и нам не удается воспроизвести то же самое конфигурация:

Настраиваем кластер, нам достаточно vCPU и памяти

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
    task_id='create-%s' % CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME, 
    project_id=PROJECT_ID,
    num_workers=2,
    num_preemptible_workers=3,
    num_masters=1,
    master_machine_type='n1-highmem-8',
    worker_machine_type='n1-highmem-8',
    subnetwork_uri='projects/#####/regions/europe-west1/subnetworks/prod',
    custom_image="spark-instance",
    master_disk_size=50,
    worker_disk_size=50,
    storage_bucket=‘#####-dataproc-tmp', 
    region='europe-west1', 
    zone='europe-west1-b',
    auto_delete_ttl=7200, 
    dag=dag
)


job = dataproc_operator.DataProcPySparkOperator(
    task_id=TASK_ID,
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region='europe-west1',
    main='%s/dist/main.py' % FOLDER,
    pyfiles='%s/dist/jobs.zip' % FOLDER,
    dataproc_pyspark_properties=spark_args,
    arguments=JOBS_ARGS,
    dag=dag
)

Использование

spark_args_powerplus = {
    'num_executors': '10',
    'executor_cores': '4',
    'executor_memory': '15G',
    'executor_memoryoverhead': '10G'
}

Похоже, что executor_memoryoverhead не учитывается, что приводит к работа провалиться. Есть ли в Datapro значение по умолчанию c, которого нам не хватает?

1 Ответ

2 голосов
/ 24 марта 2020

Datapro c не понимает эти свойства сокращения. например. num_executors должно быть spark.executor.instances вместо этого. Можете ли вы попробовать передать следующее вместо dataproc_pyspark_properties?

spark_args_powerplus = {
    'spark.executor.instances': '10',
    'spark.executor.cores': '4',
    'spark.executor.memory': '15G',
    'spark.executor.memoryOverhead': '10G'
}
...