Spark Local получает лучшую производительность, чем Cluster Mode - PullRequest
1 голос
/ 10 июня 2019

В настоящее время я использую Spark с DataFrames для создания серии запросов HTTP GET для каждой строки.В настоящее время я использую зависимость Apache HttpClient 4.5.x для выполнения запросов GET.Я заметил, что когда я работаю в локальном режиме (--master local[256]), я могу получить довольно большое количество GET / сек (приблизительно 100 / сек).Однако (что неудивительно), когда я пытаюсь масштабировать последние 256 потоков локальных драйверов, производительность начинает снижаться.Я могу запустить 256 локальных потоков на своем 4-ядерном ноутбуке из-за длительной блокировки ввода-вывода от HttpClient.

Затем я переключился на кластер Spark с 5 рабочими узлами в AWS EMR (с каждым рабочимузел, имеющий вдвое больше ядер, чем мой ноутбук), надеясь эмулировать эмуляцию той же настройки 256 потоков на узел, чтобы попытаться увеличить пропускную способность в 5 раз.spark-submit не запускается, когда я установил для параметра --executor-cores значение, превышающее число доступных vCores, сообщенных YARN.

Существует ли способ легко воспроизвести локальное [256] поведение 5 рабочих, чтобы получитьпримерное увеличение пропускной способности в 5 раз?

Я пробовал несколько вариантов с результатами ниже среднего.

  1. Я попытался настроить автономный кластер Dockerized Spark и обновил SPARK_WORKER_CORESочень большое число с ограниченным успехом.Хотя теперь я могу раскрутить 128 потоков на одного работника, производительность ухудшается.Прошло несколько часов с момента его попытки, поэтому я не могу вспомнить, происходило ли большое количество случайных перемещений или нет.
  2. Я обновил свой код для реализации шаблона асинхронного скользящего окна .Это была удивительная статья, которая выглядела многообещающе, но даже с настройкой на 5 рабочих узлов я все еще не могу выполнить столько GET, сколько local[256] (хотя это близко).Примечание: пост объясняет обновленный «скользящий итератор», который я не смог преобразовать (с тех пор он изменил свой код, чтобы использовать предоставленную Google библиотеку Futures, и я пытался придерживаться версии scala.concurrent).
  3. Я также пытался использовать библиотеку Apache HttpAsyncClient 4.1.x, но это, похоже, не имело большого значения.

Это текущая база кода, которая была написанадля настройки 5-рабочего узла.Настройка local[256] делает наивное df.map(row => getRow(row)

package com.me.bot

import java.util.concurrent.{Executors, TimeUnit}

import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.impl.client.HttpClients
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.spark.sql.{Row, SparkSession}

object Bot {
  val MaxConcurrency: Int = 128

  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("Bot").getOrCreate()
    import spark.implicits._

    val df = spark.read.option("header", "true").csv("/path/to/urls.csv")
    df.rdd.
      map(row => ThreadedConcurrentContext.executeAsync(getRow(row))).
      mapPartitions(it => ThreadedConcurrentContext.awaitSliding(it, MaxConcurrency)).
      toDF().write.mode("append").parquet("/path/to/output")

    spark.stop()
  }

  def getRow(row: Row) = {
    val url = row.getAs[String]("url")
    GetResult(Browser.getStatusCode(url), url)
  }
}

case class GetResult(statusCode: Int, url: String)

object Browser extends Serializable {
  lazy val cm = {
    val manager = new PoolingHttpClientConnectionManager()
    manager.setMaxTotal(Bot.MaxConcurrency)
    manager.setDefaultMaxPerRoute(Bot.MaxConcurrency)
    manager
  }

  lazy val httpClient = HttpClients.custom().setConnectionManager(cm).setConnectionTimeToLive(5, TimeUnit.MINUTES).build()

  val context = new ThreadLocal[HttpClientContext] {
    override def initialValue = HttpClientContext.create()
  }

  def getStatusCode(url: String) = {
    var response: CloseableHttpResponse = null

    try {
      val httpget = new HttpGet(url)
      response = httpClient.execute(httpget, context.get())
      response.getStatusLine.getStatusCode
    }
    catch {
      case ex: Exception => -1
      case _: Throwable => -1
    }
    // soooo not Scala... sorry!
    finally {
      if (response != null) {
        response.close()
      }
    }
  }
}

// From http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/
/** A singleton object that controls the parallelism on a Single Executor JVM */
object ThreadedConcurrentContext {
  import scala.concurrent._
  import scala.concurrent.duration.Duration
  import scala.concurrent.duration.Duration._

  implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(Bot.MaxConcurrency))

  /** Wraps a code block in a Future and returns the future */
  def executeAsync[T](f: => T): Future[T] = Future(f)

  /** Awaits only a set of elements at a time. Instead of waiting for the entire batch
    * to finish waits only for the head element before requesting the next future*/
  def awaitSliding[T](it: Iterator[Future[T]], batchSize: Int, timeout: Duration = Inf): Iterator[T] = {
    val slidingIterator = it.sliding(batchSize - 1) //Our look ahead (hasNext) will auto start the nth future in the batch
    val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
    initIterator.map( futureBatch => Await.result(futureBatch.head, timeout)) ++
      tailIterator.flatMap( lastBatch => Await.result(Future.sequence(lastBatch), timeout))
  }

  def awaitAll[T](it: Iterator[Future[T]], timeout: Duration = Inf) = {
    Await.result(Future.sequence(it), timeout)
  }
}

Я хотел бы получить пропускную способность 5x, используя 5 рабочих узлов.У меня нет планов масштабироваться намного дальше этого, поскольку мой веб-сервер, вероятно, не может справиться с гораздо большим.

...