Можно ли объединить объект вместо строки с API весеннего облачного потока? - PullRequest
0 голосов
/ 07 ноября 2018

Я хочу использовать API-интерфейс Spring Cloud Stream для агрегации событий из темы. Поэтому я использую в качестве ввода KStream.

KStream<Object, LoggerCreatedMessage>

Теперь я хочу использовать агрегатор для хранения моего нового объекта в хранилище KeyValue, поэтому я использую следующий код:

input
  .map((key, value) -> {
    return new KeyValue<>(value.logger_id,value);
  })
  /*.groupBy(
    (s, loggerEvent) -> loggerEvent.logger_id,
    Serialized.with(null, loggerEventSerde))*/
  .groupByKey()
  .aggregate(
    String::new,
    (s, loggerEvent, vr) -> {
      return vr;
    },
    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME).withKeySerde(Serdes.String()).
      withValueSerde(Serdes.String())
  );

Почему я могу использовать только строку в качестве инициализатора, если невозможно использовать какой-либо объект?

Вместо String :: new я хотел использовать LoggerDomain :: new, но я получаю только это сообщение об ошибке:

Неверный тип возврата в ссылке на метод: невозможно преобразовать LoggerDomain в VR

Мне что-то не хватает?

1 Ответ

0 голосов
/ 07 ноября 2018

Вы определяете <key,value> как <String, String> через Materialized.<String, String, KeyValueStore<Bytes, byte[]>> - если тип значения должен быть LoggerDomain, он должен быть Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte[]>>().

Обратите внимание, что для LoggerDomain в этом случае необходимо указать Serde для Materialized.

...