идентификатор группы потребителей по умолчанию в кафке - PullRequest
0 голосов
/ 17 января 2019

Я работаю с Kafka 2.11 и довольно плохо знаком с ним. Я пытаюсь понять группы потребителей kafka, у меня есть 3 спарк-приложения, использующие одну и ту же тему, и каждое из них получает все сообщения из этой темы. Поскольку я не упоминал ни одного идентификатора группы потребителей в приложениях, я предполагаю, что Kafka назначает какой-то отдельный идентификатор группы потребителей каждому из них. Мне нужно сбросить смещение Кафки для одного из приложений, используя команду ниже. Поскольку я не знаю имя группы потребителей своего приложения, я застрял здесь. Нужно ли явно назначать идентификатор группы в приложении, а затем использовать его в приведенной ниже команде?

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute

Если это правда, как я могу получить идентификатор группы потребителей каждого приложения? Я не могу

Ответы [ 3 ]

0 голосов
/ 17 января 2019

Поскольку я не упоминал ни одного идентификатора группы потребителей в приложениях, я предполагаю, что Kafka назначает какой-то отдельный идентификатор группы потребителей для каждого из них

Брокеры Kafka не присваивают имена групп потребителей подключенным к ним потребителям. Когда потребитель подключается, подписываясь на тему, он «присоединяется» к группе. Если вы используете приложение Spark без указания какой-либо группы потребителей, это означает, что каким-то образом библиотека / инфраструктура, которую вы используете для подключения к Kafka из приложения Spark, сама присваивает имена групп потребителей.

0 голосов
/ 17 января 2019

Если вы перейдете в код Spark, вы найдете класс KafkaSourceProvider, отвечающий за программу чтения исходного кода Kafka, вы увидите, что генерируется случайный group.id:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

  override def createSource(
    sqlContext: SQLContext,
    metadataPath: String,
    schema: Option[StructType],
    providerName: String,
    parameters: Map[String, String]): Source = {
      validateStreamOptions(parameters)
      // Each running query should use its own group id. Otherwise, the query may be only assigned
      // partial data since Kafka will assign partitions to multiple consumers having the same group
      // id. Hence, we should generate a unique id for each query.
      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    ...
  }

Вы можете искать group.id с префиксом spark-kafka-source, но вы не можете найти group.id для конкретной группы.

Чтобы найти все идентификаторы групп потребителей, вы можете использовать следующую команду: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list

Чтобы проверить смещения групп потребителей, вы можете использовать следующую команду: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe

0 голосов
/ 17 января 2019

Потребитель group.id обязателен. Если вы не установите потребитель group.id, вы получите исключение. Итак, очевидно, что вы устанавливаете его где-то в своем коде, или фреймворк или библиотека, которую вы используете, устанавливает его внутри. Вы всегда должны установить group.id самостоятельно.

Вы можете получить идентификаторы группы потребителей, используя следующую команду:

bin/kafka-consumer-groups.sh  --list --bootstrap-server <kafka-broker-ip>:9092
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...