Невозможно применить selectKey в потоке kafka - PullRequest
0 голосов
/ 21 апреля 2020

Я пытаюсь написать простую фильтрующую логику c с потоком kafka, но получаю ошибку каждый раз с методом selectKey и не могу выполнить дальнейшую процедуру.

val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)

    val filteredValue: KStream[String, String] =  textLines.
    filter((key: String, value: String) => value.contains(","))
     .selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)

Ошибка:

overloaded method value selectKey with alternatives: (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String],x$2: org.apache.kafka.streams.kstream.Named)org.apache.kafka.streams.kstream.KStream[String,String] <and> (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: String])org.apache.kafka.streams.kstream.KStream[String,String] cannot be applied to ((String, String) ⇒ String)

1 Ответ

3 голосов
/ 21 апреля 2020

В Scala иногда требуется указать точные привязки типов, потому что вывод типов не будет работать так, как вы привыкли в Scala. Когда вы обнаружите эти ошибки ввода, вы можете применить практическое правило - разделять строки новыми явно набранными значениями, пока не найдете место, где вам нужно указать тип.

Похожие проблемы и решения:

The compiler need some help to infer the correct type for the aggregator parameter.

To make it compile you can try:

val store: Materialized[String, String, KeyValueStore[Bytes, Array[Byte]]] = ???

private def save(pea: KStream[String, String]): Unit = {
  val aggregator: Aggregator[String, String, String] = (_, _, value: String) => value
  pea
    .groupByKey()
    .aggregate(() => """{folder: ""}""",
      aggregator,
      store)
}

из здесь .

И здесь - это еще один из Java, который иллюстрирует другой вид строгой типизации также в Kafka Streams:

Try moving generic types specification after as method:

val state: KTable[String, String]  = builder
    .table[String, String]("BARY-PATH", Materialized.as[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("PATH-STORE"))

Как видно из сигнатуры Java, для методов stati c следует указывать типы generi c для метода, а не для класса.

You может найти много других примеров, даже за пределами Kafka Streams, о необходимости явной типизации в Scala.

...