Я хочу использовать API-интерфейс Spring Cloud Stream для агрегации событий из темы.
Поэтому я использую в качестве ввода KStream.
KStream<Object, LoggerCreatedMessage>
Теперь я хочу использовать агрегатор для хранения моего нового объекта в хранилище KeyValue, поэтому я использую следующий код:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
Почему я могу использовать только строку в качестве инициализатора, если невозможно использовать какой-либо объект?
Вместо String :: new я хотел использовать LoggerDomain :: new, но я получаю только это сообщение об ошибке:
Неверный тип возврата в ссылке на метод: невозможно преобразовать LoggerDomain в VR
Мне что-то не хватает?