почему createDirectStream создает только одного потребителя? (что приводит к низкой доступности !!) - PullRequest
0 голосов
/ 04 ноября 2019

Я использую kafkaUtils.createDirectStream () в потоковой передаче искры. Этот метод поможет мне создать потребителя группы kafka (идентификатор группы передается в качестве параметра). И группа подписывается на несколько тем . каждая тема имеет несколько разделов . Эта структура кажется действительно хрупкой , потому что, когда я вручную закрываю некоторых брокеров kafka, делая некоторые разделы тем недоступными, потребитель будет заблокирован ! что приводит к блокировке всего процесса потребления! почему createDirectStream () не создает нескольких потребителей, так что, когда некоторые потребители некоторых разделов блокируются, другие потребители в этой группе все еще могут получать сообщения, так что не блокируется весь поток?

в Руководство по интеграции Spark Streaming + Kafka ,

Подход Direct Stream. Он обеспечивает простой параллелизм, соответствие 1: 1 между разделами Kafka и разделами Spark и> доступ к смещениям и метаданным.

Из этого описания я предполагаю, что createDirectStream () создаст несколько потребителей, один потребитель для одного раздела , что приводит к высокой доступности и одновременности. Но когда я проверяю состояние группы потребителей с помощью сценария kafka cml, я замечаю, что есть только один созданный потребитель ! когда я закрывал некоторых брокеров, чтобы один раздел не был доступен, весь процесс потоковой передачи блокировался! Можно ли создать несколько потребителей в группе с помощью createDirectStream ()?

Вот как я использую createDirectStream

val kafkaParams = Map[String,String] (
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false",
      "group.id" -> groupId
    )
    val locationStrategy = LocationStrategies.PreferConsistent
    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)
    val messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)

Вот как я проверяю состояние группы потребителей:

 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.103.236:9092,192.168.103.237:9092,[B192.168.103.238:9092 --describe --group spark_streaming_group_test

Я обнаружил, что создан только один потребитель: enter image description here информация о теме: enter image description here, когда я вручную выключаю брокер 0 с помощью kill -9, он работает нормальнопосле восстановления баланса (каждый раздел остается доступным из-за реплик) enter image description here Затем я закрываю другого брокера, в результате чего раздел 2 недоступен: enter image description here Затем весь процесс потоковой передачи блокируется! (нет новых пакетов после 11:34:40, добавленных в список задач в интерфейсе Spark Streaming) enter image description here

Но это Это не то, что я предполагаю ! Я полагаю, что потребительский клиент продолжает получать сообщения из темы ub_read_log из разделов 0 и 1, поскольку они доступны! . Более того, я предполагаю, что сообщения других тем не блокируются. Но каждая вещь заблокирована! Я думаю, что это потому, что createDirectStream () создает только один потребитель. Если будет создано несколько потребителей, все будет хорошо, не так ли? Что-то не так с моим наблюдением? любые предложения приветствуются, большое спасибо!

...