Я пишу приложение в Kafka v1.0 Streams и сталкиваюсь со странной проблемой времени выполнения, когда пытаюсь запустить приложение.Приложение, упрощенно, выглядит так:
KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable =
groupedStream
.aggregate( ()-> new ValueClass(), // initializer
( string, invalue, aggvalue ) -> {
ValueClass aggResult = f( invalue, aggvalue );
return aggResult; }, // aggregator
Materialized.with( Serdes.String(), ValueClassSerde )
);
Я подтвердил с помощью операции print( Printed.toSysOut() )
, что вход KStream выглядит так, как я ожидал, и добавив groupByKey()
, приложение все еще работает, но когда ядобавив операцию агрегирования, я получаю ошибку во время выполнения:
java.lang.ClassCastException: java.lang.String cannot be cast to ValueClass
Я просмотрел программу несколько раз, и она все еще выглядит разумной для меня, и я не могу понять, что пытается обрабатыватьдля приведения String
к экземпляру ValueClass
.
Не могли бы вы объяснить, что говорит мне сообщение об ошибке, и что мне нужно сделать, чтобы решить проблему?
Следующая проблема: я изменил приведенный выше код, добавив шаг для преобразования выходной таблицы агрегации в поток и напечатав:
KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable =
groupedStream
.aggregate( ()-> new ValueClass(), // initializer
( string, invalue, aggvalue ) -> {
ValueClass aggResult = f( invalue, aggvalue );
return aggResult; }, // aggregator
Materialized.with( Serdes.String(), ValueClassSerde )
);
aggregatedTable.toStream().print( Printed.toSysOut() );
Программа работает, но ничего не выходит.Более того, когда я запускаюсь под отладчиком Eclipse и помещаю точку останова в функцию агрегирования f
, выполнение никогда не достигает точки останова.
Я читал что-то о том, что KTable.toStream не выводит каждый результат для сохраненияпотоковый трафик;это то, что здесь происходит, и есть ли способ предотвратить это, чтобы вывод сразу сбрасывался?