Kafka Stream Exception: GroupAuthorizationException - PullRequest
1 голос
/ 27 мая 2019

Я занимаюсь разработкой приложения Kafka-Stream, которое будет читать сообщения из входной темы Kafka, фильтровать ненужные данные и выдвигать для вывода темы Kafka.

Конфигурация Kafka Stream:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}

KStream Filter Logic:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
    KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

    filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
    /** After filtering printing the same message */
    filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
    return stream;
}

При запуске над приложением Kafka Stream на основе пружины я получал исключение ниже.

2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

Наша команда Kafka Infra предоставила необходимое разрешение для «group.id», используя тот же «идентификатор группы», что я могу получить сообщение, используя другие приложения Kafka Consumer, и Я использовал имя по своему желанию в «приложении». идентификатор ". Мы не добавляем / не обновляем "application.id" в Kafka Access Control List.

Я действительно не уверен, что нам нужно дать какое-либо разрешение для «application.id», или я что-то упустил в Конфигурации Kafka Stream. Пожалуйста, совет.

Обратите внимание: я пытался использовать с «group.id» и без «group.id» в конфигурации потока Kafka, все время получаю одно и то же исключение.

Спасибо! Бхаратираджа Шанмугам

Ответы [ 2 ]

1 голос
/ 30 мая 2019

Нам также нужно настроить доступ для application.id. Для получения дополнительной информации, пожалуйста, обратитесь -> https://docs.confluent.io/current/streams/developer-guide/security.html

Обязательный параметр ACL для защищенных кластеров Kafka Кластеры Kafka могут использовать списки ACL для управления доступом к ресурсам (например, возможностью создавать темы), и для таких кластеров каждый клиент, включая Kafka Streams, должен проходить аутентификацию в качестве конкретного пользователя для авторизации с соответствующим доступом. В частности, когда приложения Streams запускаются на защищенном кластере Kafka, у участника, на котором запущено приложение, должен быть установлен ACL, чтобы у приложения были разрешения на создание внутренних тем. Поскольку все внутренние темы, а также имя встроенной группы потребителей имеют префикс идентификатора приложения, рекомендуется использовать списки управления доступом к шаблону ресурса с префиксом для настройки списков управления, чтобы позволить клиенту управлять всеми темами и группами потребителей, запущенными с этим префиксом, как - Префикс с типом шаблона ресурса --topic --operation All (подробности см. в KIP-277 и KIP-290). Например, с учетом следующей настройки вашего приложения Streams: • Config application.id - это team1-streams-app1. • Аутентификация с помощью кластера Kafka как пользователя team1. • Кодированная топология приложения считывает из тем ввода input-topic1 и input-topic2. • Запись топологии приложения в выходные темы output-topic1 и output-topic2. Затем следующие команды создадут необходимые ACL-списки в кластере Kafka, чтобы ваше приложение могло работать:

Разрешить потокам читать темы ввода:

bin / kafka-acls ... --add --allow-Principal Пользователь: team1 --operation Read --topic input-topic1 --topic input-topic2

Разрешить потокам записывать в выходные темы:

bin / kafka-acls ... --add --allow-Principal Пользователь: team1 --operation Записать --topic output-topic1 --topic output-topic2

Разрешить потокам управлять своими внутренними темами и группами потребителей:

bin / kafka-acls ... --add --allow-Principal Пользователь: team1 --operation All --resource-pattern-type prefixed --topic team1-streams-app1 --group team1-streams-app1

1 голос
/ 27 мая 2019

Я не за столом, но я думаю, что Streams устанавливает для group.id значение application.id.

...