Я использую процессор API для очистки хранилища состояний
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
Я использую TopicRecordNameStrategy, поскольку мы транслируем вывод нескольких слушателей на общий слушатель -
spring.cloud.stream.kafka.streams.binder.configuration.value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Этот код выбрасывает исключение -
2020-03-12 11:58:22.306 EDT ERROR 51635 - [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] stream-thread [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=composite, partition=0, offset=25
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
2020-03-12 11:58:22.332 EDT INFO 51635 - [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] stream-thread [repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1] Shutdown complete
Exception in thread "repository-service--1406394690-200b1fe1-837d-46c8-8b8f-71f3102a06c9-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=composite, partition=0, offset=25
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: In configuration value.subject.name.strategy = io.confluent.kafka.serializers.subject.TopicRecordNameStrategy, the message value must only be an Avro record schema
Заранее большое спасибо за вашу помощь, ценится любое понимание: -)