Долгое и последовательное ожидание между задачами в работе потокового воспроизведения - PullRequest
0 голосов
/ 07 июня 2018

У меня на Месосе работает потоковая передача искр.Все его партии занимают одно и то же время, и это время намного дольше, чем ожидалось.Задания извлекают данные из kafka, обрабатывают данные и вставляют их в cassandra, а затем снова возвращают в kafka в отдельную тему.

Каждый пакет (ниже) имеет 3 задания, 2 из которых извлекают из kafka, обрабатывают и вставляютв Кассандру, а другой тянет из Кафки, обрабатывает и толкает обратно в Кафку.

Я проверил партию в интерфейсе искры и обнаружил, что все они занимают одно и то же время (4 с), но сверление больше, онина самом деле они обрабатываются менее секунды, но все они имеют одинаковый промежуток времени (около 4 секунд).Добавление большего количества исполнителей или большей вычислительной мощности не будет иметь значения.

Details of batch: Processing time = 12s & total delay = 1.2 s ??

Так что я углубляюсь в каждую работупартия (все они занимают одинаковое время = 4 с, даже если они выполняют разную обработку):

Job 175s

Job 1753

Job 1754

Всем им нужно 4 секунды, чтобы запустить одну из своих сцен (ту, которая читает с кафки).Теперь я углубляюсь в стадию одного из них (все они очень похожи):

Details for stage 2336

Зачем это ждать?Все это на самом деле занимает всего 0,5 секунды, это просто ожидание.Это ждет Кафку?

Кто-нибудь испытывал что-нибудь подобное?Что я мог неправильно кодировать или настроить?

РЕДАКТИРОВАТЬ:

Вот минимальный код, который запускает это поведение.Это заставляет меня думать, что это должна быть установка.

object Test {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf(true)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "####,####,####",
      "group.id" -> "test"
    )

    val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      streamingContext, kafkaParams, Set("test_topic")
    )

    stream.map(t => "LEN=" + t._2.length).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

Даже если все исполнители находятся в одном и том же узле (spark.executor.cores=2 spark.cores.max=2), проблема все еще существует, и это ровно 4 секунды, как и раньше: Один исполнитель мезо

Даже если в теме нет сообщений (пакет из 0 записей), потоковая передача с пламенем занимает 4 секунды для каждого пакета.

Единственный способ, которым яудалось исправить это, установив cores=1 и cores.max=1, чтобы он создавал только одну задачу для выполнения.

Эта задача имеет локальность NODE_LOCAL.Таким образом, кажется, что когда NODE_LOCAL выполнение выполняется мгновенно, но когда Locality равно ANY, требуется 4 секунды для подключения к kafka.Все машины находятся в одной сети 10Gb.Любая идея, почему это будет?

1 Ответ

0 голосов
/ 08 июня 2018

Проблема была с spark.locality.wait, эта ссылка дала мне идею

Его значение по умолчанию составляет 3 секунды, и это занимало все это время для каждой партии, обработанной в sparkstreaming.

Я установил его на 0 секунд при отправке задания с помощью Mesos (--conf spark.locality.wait=0), и теперь все работает как ожидалось.

...