Java: OpenJdk 11 Kafka: 2.2.0 Kafka streams lib: 2.3.0
Я пытаюсь развернуть свое приложение Kafka streams в контейнере docker , но при попытке выполнитьсоздайте внутреннее хранилище состояний с исключением TopicAuthorizationException. Это хорошо работает на местном уровне. Основное различие между локально и на сервере заключается в том, что там он подключается к серверу, развернутому Kafka, и аутентифицируется с использованием обычной Kerberos аутентификации. Я не могу понять связь между аутентификацией и локальными хранилищами .
Мой поток выглядит так:
StreamsBuilder builder = new StreamsBuilder();
//We stream from the source topic
KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
.with(Serdes.serdeFrom(String.class), INPUT_SERDE));
//We group per room and window
TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
.groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));
//We make them a list
KStream<Windowed<String>, WindowedMessages> grouped = windowed
.aggregate(WindowedMessages::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream();
//Filter
KStream<Windowed<String>, FilterResult> filtered = grouped
.mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));
//Re map to its original form
KStream<String, OutputPayload> reduced = filtered
.flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
.getMessages()
.stream().map(payload -> new KeyValue<>(key.key(), payload))
.collect(toList()));
//Target topic
reduced.to(sinkTopic, Produced
.with(Serdes.serdeFrom(String.class), SERDE));
return builder.build();
Он получает поток сообщений, Windows это,агрегирует все сообщения в окне, сохраняет только последнюю версию списка с «Подавленным», а затем записывает целое, чтобы переслать его в другую тему.
Каждый раз, когда я получаю такое исключение:
Сообщение об ошибке было: org.apache.kafka.common.errors.TopicAuthorizationException: не авторизован для доступа к темам: [Ошибка авторизации темы.] 2019-10-09 06: 44: 03.255 +0000 ОШИБКА [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Обнаружено исключение, возникшее во время обработки неожиданной Kafобычно это указывает на внутренние ошибки потоков: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - [] org.apache.kafka.streams.errors.StreamsException: Не удалось создать тему фильтра-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. в org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions (InternalTopicManager.java:212) в org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics (InternalTopicMapager) orj.kafka.streams.processor.internals.InternalTopicManager.makeReady (InternalTopicManager.java:104) в org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic (StreamsPartitionAssignor.jkaka71kakaka7171) atg71: 9.processor.internals.StreamsPartitionAssignor.assign (StreamsPartitionAssignor.java:618) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment (ConsumerCoordinator.java:4merc.kakakakakakakasuka.kaf.sucong. .AbstractCoordinator.onJoinLeader (AbstractCoordinator.java:622) по адресу org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access $ 1100 (AbstractCoordinator.java:107) по адресу org.apache.kafka.clientsCo. JoinGroupResponseHandler.handle (AbstractCoordinator.java:544) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ JoinGroupResponseHandler.handle (AbstractCoordinator.java:527) в org.apache.kafka.clients.consumer.internalsCoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:978) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:958) в org.apmerin.conf. 1 $. .onSuccess (RequestFuture.java:204) в org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess (RequestFuture.java:167) в org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java: 127. : 388) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:294) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient :233)в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:212) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCo) или atg.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:358) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:35aplikakakakakafka.kag или.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded (KafkaConsumer.java:1251) в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1216) в org.sumer.java: 1201) в org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:941) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:846) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:805) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:774) Причина: org.apache.kafka.common.errors.TopicAuthorizationException: не авторизован для доступа к темам: [Ошибка авторизации темы.]