В соответствии с этим https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html создание отдельного экземпляра потребителя kafka с той же группой приведет к перебалансированию разделов.Я считаю, что этот баланс не терпится потребителем.Как мне исправить это
Теперь все разделы используются только одним потребителем.Если скорость приема данных высока, потребитель может не спешить потреблять данные со скоростью загрузки.
Добавление большего количества потребителей в ту же группу потребителей для потребленияДанные из темы и увеличить уровень потребления.Spark Streaming с использованием этого подхода 1: 1 параллелизм между разделами Kafka и разделами Spark.Spark будет обрабатывать его внутренне.
Если число пользователей превышает количество разделов по темам, оно будет в состоянии простоя, а ресурсы используются не полностью.Всегда рекомендуется, чтобы потребитель был меньше или равен количеству разделов.
Kafka будет перебалансирован, если будет добавлено больше процессов / потоков.ZooKeeper может быть перенастроен с помощью кластера Kafka, если какой-либо потребитель или брокер не может отправить пульс ZooKeeper.
Kafka перебалансирует хранилище разделов при сбое любого брокера или добавлении нового раздела всуществующая тема.Это специфично для Кафки, как сбалансировать данные между разделами в посредниках.
Потоковая передача Spark обеспечивает простой параллелизм 1: 1 между разделами Kafka и разделами Spark.Если вы не предоставляете какие-либо сведения о разделах с помощью ConsumerStragies.Assign, использует из всех разделов данной темы.
Kafka назначает разделы темы потребителю в группе, так что каждый раздел потребляется ровно одним потребителем в группе.Kafka гарантирует, что сообщение будет когда-либо прочитано только одним потребителем в группе.
Когда вы запускаете второе задание потоковой передачи, другой потребитель пытается использовать тот же раздел у того же потребителя.GroupID.Так что выдает ошибку.
val alertTopics = Array("testtopic")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> sparkJobConfig.kafkaConsumerGroup,
"auto.offset.reset" -> "latest"
)
val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))
val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))
Если вы хотите использовать задание искры для конкретного раздела, используйте следующий код:
val topicPartitionsList = List(new TopicPartition("topic",1))
val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies
Потребители могут присоединиться к группе с помощью samegroup.id.
val topicPartitionsList = List(new TopicPartition("topic",3), new TopicPartition("topic",4))
val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
Добавление еще двух потребителей - это добавление в один и тот же groupid.
Пожалуйста, прочитайте Spark-Kafkaруководство по интеграции.https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
Надеюсь, это поможет.