Согласно этой статье , есть десять вещей, которые вы должны принять во внимание, если хотите повысить производительность своего кластера.
Возможно, будет хорошей идеей позволить Spark автоматически масштабировать количество исполнителей, установив для параметра spark.dynamicAllocation.enabled
значение true
. Обратите внимание, что эта конфигурация также требует включения параметра spark.shuffle.service.enabled
, см. документацию .
Второй подход к исполнителям объясняется здесь , если вы хотите попробовать В этой конфигурации другой поток Stackoverflow объясняет, как настроить параметр yarn.scheduler.capacity.resource-calculator
в Datapro c.
РЕДАКТИРОВАТЬ:
Я воссоздал Ваш сценарий со чтением многих файлов из корзины GCS, и я смог увидеть, что для выполнения этой операции было использовано более одного исполнителя.
Как?
Использование RDD.
Устойчивые распределенные наборы данных (RDD) - это коллекции неизменяемых объектов JVM, которые распределяются по адресу Apache Spark кластер. Данные в СДР разбиваются на куски на основе ключа, а затем распределяются по всем узлам исполнителя. СДР обладают высокой отказоустойчивостью, то есть могут быстро восстанавливаться после любых проблем, поскольку одни и те же блоки данных реплицируются на несколько узлов-исполнителей. Таким образом, даже если один исполнитель выйдет из строя, другой все равно будет обрабатывать данные.
Существует два способа создания RDD: распараллеливание существующей коллекции или обращение к набору данных во внешней системе хранения ( GCS bucket * ). СДР могут быть созданы с использованием методов textFile()
/ wholeTextFile()
SparkContext.
SparkContext.wholeTextFiles
позволяет читать каталог, содержащий несколько небольших файлов, и возвращает каждый из них в виде пар (имя файла, содержимое). Это отличается от SparkContext.textFile
, который возвращает по одной записи на строку в каждом файле.
Я написал код в Python и запустил задание pySpark в Datapro c:
import pyspark
sc = pyspark.SparkContext()
rdd_csv = sc.wholeTextFiles("gs://<BUCKET_NAME>/*.csv")
rdd_csv.collect()
Я вижу, что вы используете Scala язык. Пожалуйста, обратитесь к документации Spark , чтобы получить Scala фрагменты кода. Я предполагаю, что это будет примерно так:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ReadManyFiles {
def main(args: Array[String]) {
if (args.length != 1) {
throw new IllegalArgumentException(
"1 argument is required: <inputPath> ")
}
val inputPath = args(0)
val sc = new SparkContext(new SparkConf().setAppName("<APP_NAME>"))
val rdd_csv = sc.wholeTextFiles(inputPath)
rdd_csv.collect()
}
}
, где можно указать inputPath
при запуске задания Dataflow (или вы можете жестко запрограммировать его в файле .scala
):
gcloud dataproc jobs submit spark \
--cluster=${CLUSTER} \
--class <CLASS> \
--jars gs://${BUCKET_NAME}/<PATH>.jar \
-- gs://${BUCKET_NAME}/input/
Надеюсь, это тебе поможет. Если у вас есть еще вопросы, пожалуйста, задавайте.