Могу ли я читать CSV-файлы из Google Storage с помощью Spark более чем у одного исполнителя? - PullRequest
1 голос
/ 14 апреля 2020

У меня есть несколько csv файлов, сжатых в корзине Google, они сгруппированы в папки по часам, что означает, что другое приложение сохраняет несколько таких файлов в папках с часами в названии.

В общем, тогда у меня есть приложение Spark, которое читает все эти файлы - тысячи из них - с простым кодом, подобным приведенному ниже:

sparkSession.read
      .format("csv")
      .option("sep", "\t")
      .option("header", false)
      .option("inferSchema", false)
      .csv(path))

Это займет больше часа, потому что они сжаты?

Я также заметил, что в интерфейсе Spark у меня только один исполнитель, но не более одного. Разве я не могу использовать несколько исполнителей для параллельного чтения этих файлов и ускорения обработки? Как это сделать? Я пытаюсь создать временное представление с файлами для дальнейших операторов SQL от Spark.

Я работаю в Datapro c со стандартной конфигурацией Yarn.

Ответы [ 2 ]

0 голосов
/ 14 апреля 2020

Ресурсы должны уже масштабироваться для вашего приложения уже динамически, обычно вам не нужно явно устанавливать номера исполнителя.

В вашем случае, зависит от того, насколько велик ваш набор данных, это может быть размер кластера или виртуальные машины слишком малы для обработки увеличенного размера входных данных, возможно, попробуйте увеличить количество виртуальных машин / узлов в вашем кластере, или использовать виртуальные машины с большим объемом оперативной памяти.

0 голосов
/ 14 апреля 2020

Согласно этой статье , есть десять вещей, которые вы должны принять во внимание, если хотите повысить производительность своего кластера.

Возможно, будет хорошей идеей позволить Spark автоматически масштабировать количество исполнителей, установив для параметра spark.dynamicAllocation.enabled значение true. Обратите внимание, что эта конфигурация также требует включения параметра spark.shuffle.service.enabled, см. документацию .

Второй подход к исполнителям объясняется здесь , если вы хотите попробовать В этой конфигурации другой поток Stackoverflow объясняет, как настроить параметр yarn.scheduler.capacity.resource-calculator в Datapro c.

РЕДАКТИРОВАТЬ:

Я воссоздал Ваш сценарий со чтением многих файлов из корзины GCS, и я смог увидеть, что для выполнения этой операции было использовано более одного исполнителя.

Как?

Использование RDD.

Устойчивые распределенные наборы данных (RDD) - это коллекции неизменяемых объектов JVM, которые распределяются по адресу Apache Spark кластер. Данные в СДР разбиваются на куски на основе ключа, а затем распределяются по всем узлам исполнителя. СДР обладают высокой отказоустойчивостью, то есть могут быстро восстанавливаться после любых проблем, поскольку одни и те же блоки данных реплицируются на несколько узлов-исполнителей. Таким образом, даже если один исполнитель выйдет из строя, другой все равно будет обрабатывать данные.

Существует два способа создания RDD: распараллеливание существующей коллекции или обращение к набору данных во внешней системе хранения ( GCS bucket * ). СДР могут быть созданы с использованием методов textFile() / wholeTextFile() SparkContext.

SparkContext.wholeTextFiles позволяет читать каталог, содержащий несколько небольших файлов, и возвращает каждый из них в виде пар (имя файла, содержимое). Это отличается от SparkContext.textFile, который возвращает по одной записи на строку в каждом файле.

Я написал код в Python и запустил задание pySpark в Datapro c:

import pyspark

sc = pyspark.SparkContext()
rdd_csv = sc.wholeTextFiles("gs://<BUCKET_NAME>/*.csv")
rdd_csv.collect()

Я вижу, что вы используете Scala язык. Пожалуйста, обратитесь к документации Spark , чтобы получить Scala фрагменты кода. Я предполагаю, что это будет примерно так:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object ReadManyFiles {
  def main(args: Array[String]) {
    if (args.length != 1) {
      throw new IllegalArgumentException(
          "1 argument is required: <inputPath> ")
    }

    val inputPath = args(0)

    val sc = new SparkContext(new SparkConf().setAppName("<APP_NAME>"))
    val rdd_csv = sc.wholeTextFiles(inputPath)
    rdd_csv.collect()
  }
}

, где можно указать inputPath при запуске задания Dataflow (или вы можете жестко запрограммировать его в файле .scala):

gcloud dataproc jobs submit spark \ 
    --cluster=${CLUSTER} \ 
    --class <CLASS> \ 
    --jars gs://${BUCKET_NAME}/<PATH>.jar \ 
    -- gs://${BUCKET_NAME}/input/

Надеюсь, это тебе поможет. Если у вас есть еще вопросы, пожалуйста, задавайте.

...