Исключение при обработке данных во время процесса потока Кафки - PullRequest
0 голосов
/ 11 мая 2018

Я работаю над потоками Кафки, используя приведенный ниже код.Я проверяю условие фильтра из объекта JSON для условия, если "UserID":"1".Пожалуйста, используйте код ниже

builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
                   .filter(new Predicate <String, String>() {

               String userIDCheck = null;

               @Override
            public boolean test(String key, String value) {

                   try {
                       JSONObject jsonObj = new JSONObject(value);

                       userIDCheck = jsonObj.get("UserID").toString();
                       System.out.println("userIDCheck: " + userIDCheck);                          
                   } catch (JSONException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }

                   return userIDCheck.equals("1");
               }
            })
           .to(streamouttopic);

значение: {"UserID": "1", "Address": "XXX", "AccountNo": "989", "UserName": "Stella", "AccountType ":" YYY "}

Я получаю следующее сообщение об ошибке:

    Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)

Значение и условие в порядке из приведенного выше потокового кода, я не мог понять, почему он выдает это исключение во времяпри выполнении кода Steam.

1 Ответ

0 голосов
/ 11 мая 2018

Вы также должны указать правильные Serdes для операции to().В противном случае он использует Serdes по умолчанию из StreamsConfig, и эти ByteArraySerde - и String не могут быть преобразованы в byte[].

Вам необходимо сделать:

.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));
...