Если вы перейдете в код Spark, вы найдете класс KafkaSourceProvider
, отвечающий за программу чтения исходного кода Kafka, вы увидите, что генерируется случайный group.id:
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
...
}
Вы можете искать group.id с префиксом spark-kafka-source
, но вы не можете найти group.id для конкретной группы.
Чтобы найти все идентификаторы групп потребителей, вы можете использовать следующую команду:
./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list
Чтобы проверить смещения групп потребителей, вы можете использовать следующую команду:
./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe