Исключение Kafka Stream при очистке хранилища состояний при использовании TopicRecordNameStrategy - PullRequest
0 голосов
/ 12 марта 2020

Я использую процессор API для очистки хранилища состояний

@Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long l) {
                processorContext.commit();
            }
        }); //invoke punctuate every 12 seconds
        this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
        log.info("Processor initialized");
    }

    @Override
    public void process(String key, GenericRecord value) {
        statestore.all().forEachRemaining(keyValue -> {
            statestore.delete(keyValue.key);
        });
    }

Я использую TopicRecordNameStrategy, поскольку мы транслируем вывод нескольких слушателей на общий слушатель -

spring.cloud.stream.kafka.streams.binder.configuration.value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Этот код выбрасывает исключение -

2020-03-12 11:58:22.306 EDT ERROR 51635 - [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] stream-thread [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] Failed to process stream task 0_0 due to the following error:
    org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=composite, partition=0, offset=25
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

      2020-03-12 11:58:22.332 EDT  INFO 51635 - [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] stream-thread [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] Shutdown complete
      Exception in thread "repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=composite, partition=0, offset=25
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
      Caused by: org.apache.kafka.common.errors.SerializationException: In configuration value.subject.name.strategy = io.confluent.kafka.serializers.subject.TopicRecordNameStrategy, the message value must only be an Avro record schema

Заранее большое спасибо за вашу помощь, ценится любое понимание: -)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...