Я занимаюсь разработкой приложения Kafka-Stream, которое будет читать сообщения из входной темы Kafka, фильтровать ненужные данные и выдвигать для вывода темы Kafka.
Конфигурация Kafka Stream:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
KStream Filter Logic:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));
filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
/** After filtering printing the same message */
filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
return stream;
}
При запуске над приложением Kafka Stream на основе пружины я получал исключение ниже.
2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0
Наша команда Kafka Infra предоставила необходимое разрешение для «group.id», используя тот же «идентификатор группы», что я могу получить сообщение, используя другие приложения Kafka Consumer, и Я использовал имя по своему желанию в «приложении». идентификатор ". Мы не добавляем / не обновляем "application.id" в Kafka Access Control List.
Я действительно не уверен, что нам нужно дать какое-либо разрешение для «application.id», или я что-то упустил в Конфигурации Kafka Stream. Пожалуйста, совет.
Обратите внимание: я пытался использовать с «group.id» и без «group.id» в конфигурации потока Kafka, все время получаю одно и то же исключение.
Спасибо!
Бхаратираджа Шанмугам