Как мы можем использовать KTable из Windowed <String>, используя потоки весенних облаков кафки? - PullRequest
0 голосов
/ 03 апреля 2019

В приложении № 1 я создаю KTable -

KTable<Windowed<String>, GenericRecord> table = stream
        .groupByKey()
        .windowedBy(TimeWindows.of(windowMs))
        .aggregate(new GenericRecord(schema),
                (key, old, new) -> ..,
                Materialized.<String, GenericRecord, WindowStore<Bytes, byte[]>>
                        as("window-store")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(new GenericAvroSerde));

В другом приложении № 2 этот канал используется как

public void process(@Input(Channels.INPUT) KTable<Windowed<String>, GenericRecord> input) {
input
                .toStream((key, value) -> key.key())
                .foreach((key, value) -> {
                    LOGGER.info(value);
                });
}

В обоих приложенияхпо умолчанию keySerde - String -

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

spring.cloud.stream.kafka.streams.binder.application-id: demo-service
spring.cloud.stream.bindings.input.destination: demo-service-window-store-changelog
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.bindings.input.consumer.headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.materializedAs: window-store
spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id: ${random.int}

При использовании приложения № 2 я получаю исключение, так как -

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
    at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)

Похоже, мне нужно установить WindowSerde типа String для ключа вПриложение № 2, не знаете, как я могу установить это?

...