Поток Kafka: «TopicAuthorizationException: не авторизован для доступа к темам» для внутреннего хранилища состояний - PullRequest
3 голосов
/ 09 октября 2019

Java: OpenJdk 11 Kafka: 2.2.0 Kafka streams lib: 2.3.0

Я пытаюсь развернуть свое приложение Kafka streams в контейнере docker , но при попытке выполнитьсоздайте внутреннее хранилище состояний с исключением TopicAuthorizationException. Это хорошо работает на местном уровне. Основное различие между локально и на сервере заключается в том, что там он подключается к серверу, развернутому Kafka, и аутентифицируется с использованием обычной Kerberos аутентификации. Я не могу понять связь между аутентификацией и локальными хранилищами .

Мой поток выглядит так:

StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));

        //Re map to its original form
        KStream<String, OutputPayload> reduced = filtered
                .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
                        .getMessages()
                        .stream().map(payload -> new KeyValue<>(key.key(), payload))
                        .collect(toList()));


        //Target topic
        reduced.to(sinkTopic, Produced
                .with(Serdes.serdeFrom(String.class), SERDE));

        return builder.build();

Он получает поток сообщений, Windows это,агрегирует все сообщения в окне, сохраняет только последнюю версию списка с «Подавленным», а затем записывает целое, чтобы переслать его в другую тему.

Каждый раз, когда я получаю такое исключение:

Сообщение об ошибке было: org.apache.kafka.common.errors.TopicAuthorizationException: не авторизован для доступа к темам: [Ошибка авторизации темы.] 2019-10-09 06: 44: 03.255 +0000 ОШИБКА [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Обнаружено исключение, возникшее во время обработки неожиданной Kafобычно это указывает на внутренние ошибки потоков: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - [] org.apache.kafka.streams.errors.StreamsException: Не удалось создать тему фильтра-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. в org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions (InternalTopicManager.java:212) в org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics (InternalTopicMapager) orj.kafka.streams.processor.internals.InternalTopicManager.makeReady (InternalTopicManager.java:104) в org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic (StreamsPartitionAssignor.jkaka71kakaka7171) atg71: 9.processor.internals.StreamsPartitionAssignor.assign (StreamsPartitionAssignor.java:618) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment (ConsumerCoordinator.java:4merc.kakakakakakakasuka.kaf.sucong. .AbstractCoordinator.onJoinLeader (AbstractCoordinator.java:622) по адресу org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access $ 1100 (AbstractCoordinator.java:107) по адресу org.apache.kafka.clientsCo. JoinGroupResponseHandler.handle (AbstractCoordinator.java:544) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ JoinGroupResponseHandler.handle (AbstractCoordinator.java:527) в org.apache.kafka.clients.consumer.internalsCoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:978) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:958) в org.apmerin.conf. 1 $. .onSuccess (RequestFuture.java:204) в org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess (RequestFuture.java:167) в org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java: 127. : 388) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:294) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient :233)в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:212) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCo) или atg.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:358) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:35aplikakakakakafka.kag или.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded (KafkaConsumer.java:1251) в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1216) в org.sumer.java: 1201) в org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:941) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:846) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:805) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:774) Причина: org.apache.kafka.common.errors.TopicAuthorizationException: не авторизован для доступа к темам: [Ошибка авторизации темы.]

1 Ответ

2 голосов
/ 09 октября 2019

Это не «аутентификация», а «авторизация». Посмотрите на ваши сообщения журнала, там написано "Нет прав доступа к темам". Насколько я вижу, вы не авторизованы для создания внутренней темы 'filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog', которая поддерживает ваше локальное хранилище состояний подавления. Государственные магазины, включенные в Kafka Streams, по умолчанию поддерживаются темой на брокерах Kafka. Эти внутренние темы используются при восстановлении после отказа для восстановления локальных хранилищ состояний. Эти внутренние темы создаются автоматически приложением Kafka Streams, поэтому приложение должно иметь соответствующие разрешения для их создания.

См. https://kafka.apache.org/23/documentation/streams/developer-guide/security.html#id1 для получения дополнительной информации. Там написано, что «основной пользователь, выполняющий приложение, должен иметь список ACL, чтобы у приложения были права на создание, чтение и запись внутренних тем».

...