Ошибка при создании KTable с пользовательским ключом - PullRequest
1 голос
/ 28 апреля 2020

Вариант использования - есть топи c с сообщениями (ноль, метаданные). Мне нужно создать Ktable из topi c с ключом (metadata.entity_id) и значением в качестве метаданных. Эта таблица будет позже использована для объединения с потоком с тем же ключом.

    private final static String KAFKA_BROKERS = "localhost:9092";
    private final static String APPLICATION_ID = "TestMetadataTable";
    private final static String AUTO_OFFSET_RESET_CONFIG = "earliest";
    private final static String METADATA_TOPIC = "test-metadata-topic";


    public static void main (String args[]) {

        //Setting the Stream configuration params.
        final Properties kafkaStreamConfiguration = new Properties();
        kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);

        kafkaStreamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);      

      //Creating Serdes for MetricMetadata
        GenericJsonSerializer<MetricMetadata> metadataJsonSerializer = new GenericJsonSerializer<MetricMetadata>();
        GenericJsonDeserializer<MetricMetadata> metadataJsonDeserializer = new GenericJsonDeserializer<MetricMetadata>(MetricMetadata.class);
        Serde<MetricMetadata> metadataSerde = Serdes.serdeFrom(metadataJsonSerializer, metadataJsonDeserializer);


        //Creating kafka stream.
        final StreamsBuilder builder = new StreamsBuilder();

       KTable<String, MetricMetadata> metaTable = builder.table(METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
                .groupBy((key, value) -> KeyValue.pair(value.getEntity_id(), value))            
                .aggregate( () -> null,
                         (key, value, aggValue) -> value,
                         (key, value, aggValue) -> value
                        );

        final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamConfiguration);
        streams.start();        
    }

Как только я отправляю сообщение в топи c - METADATA_TOPI C. Это приводит к ошибке ниже. Я что-то здесь упускаю? Кафка-стримы 2.2.0

Exception in thread "TestMetadataTable-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store test-metadata-topic-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:471)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:95)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:102)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:79)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:127)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:72)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:224)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
    ... 10 more
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 28 more

1 Ответ

2 голосов
/ 28 апреля 2020

В этом случае вам необходимо предоставить Serdes для операции KTable.groupBy() через Grouped, так как вызов groupBy инициирует перераспределение. Вам также нужно будет предоставить ту же Serdes для агрегатной операции для хранилища состояний.

Кроме того, поскольку ключом является null, я думаю, что вы должны использовать KStream изначально. Затем позвоните groupByKey (вам все еще нужно предоставить Serdes через Grouped), и агрегация даст вам KTable, который вы хотите.

С макушки головы, что-то вроде этого должно работать

builder.stream((METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
        .selectKey((key, value) -> KeyValue.pair(value.getEntity_id(), value))
        .groupByKey(Grouped.with(Serdes.String(), metadataSerde))
        .aggregate( () -> null,
            (key, value, aggValue) -> value,
            (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), metadataSerde)
        );
...