Я использую kafkaUtils.createDirectStream () в потоковой передаче искры. Этот метод поможет мне создать потребителя группы kafka (идентификатор группы передается в качестве параметра). И группа подписывается на несколько тем . каждая тема имеет несколько разделов . Эта структура кажется действительно хрупкой , потому что, когда я вручную закрываю некоторых брокеров kafka, делая некоторые разделы тем недоступными, потребитель будет заблокирован ! что приводит к блокировке всего процесса потребления! почему createDirectStream () не создает нескольких потребителей, так что, когда некоторые потребители некоторых разделов блокируются, другие потребители в этой группе все еще могут получать сообщения, так что не блокируется весь поток?
в Руководство по интеграции Spark Streaming + Kafka ,
Подход Direct Stream. Он обеспечивает простой параллелизм, соответствие 1: 1 между разделами Kafka и разделами Spark и> доступ к смещениям и метаданным.
Из этого описания я предполагаю, что createDirectStream () создаст несколько потребителей, один потребитель для одного раздела , что приводит к высокой доступности и одновременности. Но когда я проверяю состояние группы потребителей с помощью сценария kafka cml, я замечаю, что есть только один созданный потребитель ! когда я закрывал некоторых брокеров, чтобы один раздел не был доступен, весь процесс потоковой передачи блокировался! Можно ли создать несколько потребителей в группе с помощью createDirectStream ()?
Вот как я использую createDirectStream
val kafkaParams = Map[String,String] (
"bootstrap.servers" -> brokers,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false",
"group.id" -> groupId
)
val locationStrategy = LocationStrategies.PreferConsistent
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)
val messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
Вот как я проверяю состояние группы потребителей:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.103.236:9092,192.168.103.237:9092,[B192.168.103.238:9092 --describe --group spark_streaming_group_test
Я обнаружил, что создан только один потребитель: информация о теме: , когда я вручную выключаю брокер 0 с помощью kill -9, он работает нормальнопосле восстановления баланса (каждый раздел остается доступным из-за реплик) Затем я закрываю другого брокера, в результате чего раздел 2 недоступен: Затем весь процесс потоковой передачи блокируется! (нет новых пакетов после 11:34:40, добавленных в список задач в интерфейсе Spark Streaming)
Но это Это не то, что я предполагаю ! Я полагаю, что потребительский клиент продолжает получать сообщения из темы ub_read_log из разделов 0 и 1, поскольку они доступны! . Более того, я предполагаю, что сообщения других тем не блокируются. Но каждая вещь заблокирована! Я думаю, что это потому, что createDirectStream () создает только один потребитель. Если будет создано несколько потребителей, все будет хорошо, не так ли? Что-то не так с моим наблюдением? любые предложения приветствуются, большое спасибо!