У меня на Месосе работает потоковая передача искр.Все его партии занимают одно и то же время, и это время намного дольше, чем ожидалось.Задания извлекают данные из kafka, обрабатывают данные и вставляют их в cassandra, а затем снова возвращают в kafka в отдельную тему.
Каждый пакет (ниже) имеет 3 задания, 2 из которых извлекают из kafka, обрабатывают и вставляютв Кассандру, а другой тянет из Кафки, обрабатывает и толкает обратно в Кафку.
Я проверил партию в интерфейсе искры и обнаружил, что все они занимают одно и то же время (4 с), но сверление больше, онина самом деле они обрабатываются менее секунды, но все они имеют одинаковый промежуток времени (около 4 секунд).Добавление большего количества исполнителей или большей вычислительной мощности не будет иметь значения.
Details of batch: Processing time = 12s & total delay = 1.2 s
??
Так что я углубляюсь в каждую работупартия (все они занимают одинаковое время = 4 с, даже если они выполняют разную обработку):
Всем им нужно 4 секунды, чтобы запустить одну из своих сцен (ту, которая читает с кафки).Теперь я углубляюсь в стадию одного из них (все они очень похожи):
Зачем это ждать?Все это на самом деле занимает всего 0,5 секунды, это просто ожидание.Это ждет Кафку?
Кто-нибудь испытывал что-нибудь подобное?Что я мог неправильно кодировать или настроить?
РЕДАКТИРОВАТЬ:
Вот минимальный код, который запускает это поведение.Это заставляет меня думать, что это должна быть установка.
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)
val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)
stream.map(t => "LEN=" + t._2.length).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
Даже если все исполнители находятся в одном и том же узле (spark.executor.cores=2 spark.cores.max=2
), проблема все еще существует, и это ровно 4 секунды, как и раньше: Один исполнитель мезо
Даже если в теме нет сообщений (пакет из 0 записей), потоковая передача с пламенем занимает 4 секунды для каждого пакета.
Единственный способ, которым яудалось исправить это, установив cores=1
и cores.max=1
, чтобы он создавал только одну задачу для выполнения.
Эта задача имеет локальность NODE_LOCAL
.Таким образом, кажется, что когда NODE_LOCAL
выполнение выполняется мгновенно, но когда Locality равно ANY
, требуется 4 секунды для подключения к kafka.Все машины находятся в одной сети 10Gb.Любая идея, почему это будет?