Взгляните на flatMap()
и flatMapValues()
, чтобы заменить второе map()
.https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html
Например:
.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))
Если вы по какой-то причине намерены продолжать использовать map
, см. Ниже.
Второйкарта не компилируется из-за этого.
Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]
Соответствующий код для справки:
.map[String, String]((key, value) => toFormattedEvents(key, value))
Вам нужно что-то вроде:
.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))
, где KR
и VR
являются типами ключа и значения, соответственно, как возвращено toFormattedEvents()
(фактические типы возврата не ясны из вашего вопроса).Чтобы это работало, вы также должны иметь Serde
для типа List[KeyValue[KR, VR]]
.
Позвольте мне проиллюстрировать это несколькими различными типами, чтобы было легче понять, какая часть вызова метода относится к какому типу.Предположим, мы хотим, чтобы вывод карты имел тип ключа Integer
и тип значения List[KeyValue[String, Long]]
:
.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))
Обратите внимание, что в этом примере назначаются одинаковые значения для каждой сопоставленной записи, но это не так.точка примера.