Apache kafka KTable агрегация ClassCastException - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть тема apache kafka "historyTopic", связанная с потоком:

KStream<Long, byte[]> stream = builder.stream("historyTopic");

У меня также есть таблица, связанная с потоком:

KTable<Long, HistoryObject> table = stream.groupByKey().aggregate(() -> {
            return new HistoryObject();
        }, (key, value, aggregate) -> {
        try {
           aggregate.add(deserializeModel(value));
        } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
        }
        return aggregate;
    }, Materialized.with(Serdes.Long(), Serdes.serdeFrom(serializer, deserializer)).as("history-store"));

Я ожидаю значение вагрегат должен быть байтом [], а агрегатная переменная - объектом класса HistoryObject. В инициализаторе я сначала создаю новый объект, а в методе агрегации я рассчитываю десериализовать значение байта [] и добавить его в список в агрегате (HistoryObject)., но я получаю следующее исключение:

java.lang.ClassCastException: [B cannot be cast to com.maxflow.historyservice.model.HistoryObject
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:91) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) ~[kafka-streams-2.0.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) ~[kafka-streams-2.0.1.jar:na]

метод deserializeModel:

@SuppressWarnings("unchecked")
    private static Card deserializeModel(byte[] serialized) throws IOException {
    ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
    ObjectInput in = null;
    Card  historyData = null;
    try {
        in = new ObjectInputStream(bis);
        historyData = (Card) in.readObject();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        bis.close();
    }
    return historyData;
    }

В этом методе я десериализирую byte [] из потока в объект Card и после этого япопробуйте добавить его в список из HistoryObject

Мои классы сериализатора и десериализатора выглядят так:

public class HistoryDataValueSerializer implements Closeable, AutoCloseable, Serializer<HistoryObject> {

    @SuppressWarnings("unused")
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override public byte[] serialize(String arg0, HistoryObject value) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
          retVal = objectMapper.writeValueAsString(value).getBytes();
        } catch (Exception e) {
          e.printStackTrace();
        }
        return retVal;
      }

    @Override
    public void close()
    {

    }

  }


public class HistoryDataValueDeserializer implements Closeable, AutoCloseable, Deserializer<HistoryObject> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public HistoryObject deserialize(String s, byte[] value) {
    ObjectMapper mapper = new ObjectMapper();
    HistoryObject historyObject = null;
    try {
        historyObject =  mapper.readValue(value, HistoryObject.class);    
    } catch (Exception e) {

        e.printStackTrace();
    }
    return historyObject;
    }


    @Override
    public void close() {

    }

  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...