Автономное потребительское выбрасывание InvalidGroupIdException - PullRequest
0 голосов
/ 05 марта 2020

Это вопрос из двух частей.

I. Код:

import java.time.Duration
import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer

import scala.collection.JavaConverters._
import io.StdIn._

object StandaloneConsumer {
  private final var topics = ""
  private final val BOOTSTRAPSERVERS = "hostname:9092"

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])

    val consumer = new KafkaConsumer[String, String](props)
    topics = readLine("Enter the topic name: ")

    try {
      val partitionInfos = consumer.partitionsFor(topics)
      val topicPartitions = new util.ArrayList[TopicPartition]
      if(partitionInfos != null) {
        for (partitionInfo <- partitionInfos.asScala) {
          topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
        }
        consumer.assign(topicPartitions)
        while (true) {
          val records = consumer.poll(Duration.ofMillis(500))
          for(record <- records.asScala) {
            println(s"key = ${record.key}, value = ${record.value()}")
          }
          consumer.commitAsync()
        }
      }
    } catch {
      case e:Exception => e.printStackTrace()
    } finally {
      consumer.commitSync()
      consumer.close()
    }
  }
}

Когда я запускаю этот , он правильно потребляет все данные , но в конце выдает:

org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

эту ошибку. Почему?

II. Я не понял эту часть кода. Что происходит ?:

val topicPartitions = new util.ArrayList[TopicPartition]
  if(partitionInfos != null) {
    for (partitionInfo <- partitionInfos.asScala) {
      topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
    }

Может кто-нибудь, пожалуйста, объясните мне это. Спасибо.

1 Ответ

0 голосов
/ 05 марта 2020

В Kafka фиксация имеет смысл только в том случае, если вы являетесь частью группы потребителей (и затем позволяете координаторам кластера Kafka управлять назначением раздела и операциями извлечения / фиксации смещения). Что касается вашего кода, похоже, что вы делаете ручное назначение разделов, но все же вы пытаетесь зафиксировать:

          consumer.commitAsync()

Некоторая документация по группе потребителей:

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

На ваш вопрос II

Этот код выбирает другой раздел, принадлежащий вашей топи c, а затем вы назначаете эти разделы своему потребителю.

Вам следует прочитать немного больше о группах потребителей Kafka и быть уверенным, хотите ли вы этого, или нет, если вы хотите использовать его или самостоятельно обрабатывать назначение разделов. Я настоятельно рекомендую использовать его (в целях масштабируемости), за исключением случаев, когда вы действительно хотите его пропустить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...