Что означает исключение во время агрегата Kafka Stream? - PullRequest
0 голосов
/ 23 мая 2018

Я пишу приложение в Kafka v1.0 Streams и сталкиваюсь со странной проблемой времени выполнения, когда пытаюсь запустить приложение.Приложение, упрощенно, выглядит так:

KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable = 
    groupedStream
    .aggregate( ()-> new ValueClass(),  // initializer
                ( string, invalue, aggvalue ) -> { 
                     ValueClass aggResult = f( invalue, aggvalue );
                     return aggResult; }, // aggregator
                Materialized.with( Serdes.String(), ValueClassSerde )
              );

Я подтвердил с помощью операции print( Printed.toSysOut() ), что вход KStream выглядит так, как я ожидал, и добавив groupByKey(), приложение все еще работает, но когда ядобавив операцию агрегирования, я получаю ошибку во время выполнения:

java.lang.ClassCastException: java.lang.String cannot be cast to ValueClass

Я просмотрел программу несколько раз, и она все еще выглядит разумной для меня, и я не могу понять, что пытается обрабатыватьдля приведения String к экземпляру ValueClass.

Не могли бы вы объяснить, что говорит мне сообщение об ошибке, и что мне нужно сделать, чтобы решить проблему?


Следующая проблема: я изменил приведенный выше код, добавив шаг для преобразования выходной таблицы агрегации в поток и напечатав:

KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable = 
    groupedStream
    .aggregate( ()-> new ValueClass(),  // initializer
                ( string, invalue, aggvalue ) -> { 
                     ValueClass aggResult = f( invalue, aggvalue );
                     return aggResult; }, // aggregator
                Materialized.with( Serdes.String(), ValueClassSerde )
              );
aggregatedTable.toStream().print( Printed.toSysOut() );

Программа работает, но ничего не выходит.Более того, когда я запускаюсь под отладчиком Eclipse и помещаю точку останова в функцию агрегирования f, выполнение никогда не достигает точки останова.

Я читал что-то о том, что KTable.toStream не выводит каждый результат для сохраненияпотоковый трафик;это то, что здесь происходит, и есть ли способ предотвратить это, чтобы вывод сразу сбрасывался?

...