Как отправить список KeyValue в Kafka? - PullRequest
2 голосов
/ 29 мая 2019

Я пытаюсь отправить список [KeyValue] в тему в приложении Kafka Streams.Но поток ожидает единственное KeyValue.Как я могу отправить KeyValues ​​в поток, а не весь список?

class MainTopology {

  val createTopology = (sourceTopic: String, sinkTopic: String) => {
    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .map[String, String]((key, value) => toFormattedEvents(key, value))
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event))
  }

}

Вторая карта не компилируется из-за этого.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

Я обновляю свой код, но теперь он выдает другую ошибку:

    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .flatMap(
      (key, value) => toFormattedEvents(key, value)
    )
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event)).toIterable.asJava
  }
Error:(15, 8) inferred type arguments [?1,?0] do not conform to method flatMap's type parameter bounds [KR,VR]
      .flatMap(
Error:(16, 20) type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[String,Option[org.minsait.streams.model.JsonMessage],Iterable[org.apache.kafka.streams.KeyValue[String,String]]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: Option[org.minsait.streams.model.JsonMessage], _ <: Iterable[_ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]]
      (key, value) => toFormattedEvents(key, value)

1 Ответ

2 голосов
/ 29 мая 2019

Взгляните на flatMap() и flatMapValues(), чтобы заменить второе map().https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html

Например:

.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))

Если вы по какой-то причине намерены продолжать использовать map, см. Ниже.

Второйкарта не компилируется из-за этого.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

Соответствующий код для справки:

.map[String, String]((key, value) => toFormattedEvents(key, value))

Вам нужно что-то вроде:

.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))

, где KR и VR являются типами ключа и значения, соответственно, как возвращено toFormattedEvents() (фактические типы возврата не ясны из вашего вопроса).Чтобы это работало, вы также должны иметь Serde для типа List[KeyValue[KR, VR]].

Позвольте мне проиллюстрировать это несколькими различными типами, чтобы было легче понять, какая часть вызова метода относится к какому типу.Предположим, мы хотим, чтобы вывод карты имел тип ключа Integer и тип значения List[KeyValue[String, Long]]:

.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))

Обратите внимание, что в этом примере назначаются одинаковые значения для каждой сопоставленной записи, но это не так.точка примера.

...