В приложении № 1 я создаю KTable -
KTable<Windowed<String>, GenericRecord> table = stream
.groupByKey()
.windowedBy(TimeWindows.of(windowMs))
.aggregate(new GenericRecord(schema),
(key, old, new) -> ..,
Materialized.<String, GenericRecord, WindowStore<Bytes, byte[]>>
as("window-store")
.withKeySerde(Serdes.String())
.withValueSerde(new GenericAvroSerde));
В другом приложении № 2 этот канал используется как
public void process(@Input(Channels.INPUT) KTable<Windowed<String>, GenericRecord> input) {
input
.toStream((key, value) -> key.key())
.foreach((key, value) -> {
LOGGER.info(value);
});
}
В обоих приложенияхпо умолчанию keySerde - String -
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.binder.application-id: demo-service
spring.cloud.stream.bindings.input.destination: demo-service-window-store-changelog
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.bindings.input.consumer.headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.materializedAs: window-store
spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id: ${random.int}
При использовании приложения № 2 я получаю исключение, так как -
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
Похоже, мне нужно установить WindowSerde типа String для ключа вПриложение № 2, не знаете, как я могу установить это?