Kafka Streams Scala group Проблемы с преобразованием типов - PullRequest
0 голосов
/ 23 мая 2018

Я пытаюсь реализовать простое приложение потоковой обработки в Scala, но сталкиваюсь с проблемами преобразования типов.Любые указатели высоко ценятся

Вот фрагмент кода Я пытаюсь получить компиляцию Scala для компиляции

case class ItemValue(LOCATION:String,
                          U_ID:String,
                          UOM:String,
                          R_ID:String,
                          ITEM_TYPE:String,
                          B_ID:String,
                          RECORDED_TIMESTAMP:String,
                          P_ID:String,
                          VALUE:String,
                          RECORDED_DTM:String,
                          DATA_TYPE:String)

val valueSerde : JSONSerde[ItemValue] = new JSONSerde[ItemValue]
  val consumed = Consumed.`with`(Serdes.String(), valueSerde)
  val items: KStream[String, ItemValue] = builder.stream("raw_topic", consumed)
    .groupBy(
      new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
        override def apply(key: String, value: ItemValue) : KeyValue[String, ItemValue] = {
          new KeyValue(value.P_ID+"_"+value.U_ID+"_"+value.R_ID+"_"+value.B_ID+"_"+value.ITEM_TYPE,value)
        }
      },
      Serialized.`with`(
        Serdes.String(), 
        new JSONSerde[ItemValue])
    )

Вот ошибка компилятора, которую я не могу понять

type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[String,com.example.streams.ItemValue,org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: com.example.streams.ItemValue, Object]
Note: String <: Any, but Java-defined trait KeyValueMapper is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: com.example.streams.ItemValue <: Any, but Java-defined trait KeyValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue] <: Object, but Java-defined trait KeyValueMapper is invariant in type VR.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
      new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
      ^
ItemStatsApp.scala:59: type mismatch;
 found   : org.apache.kafka.streams.kstream.Serialized[String,com.example.streams.ItemValue]
 required: org.apache.kafka.streams.kstream.Serialized[Object,com.example.streams.ItemValue]
Note: String <: Object, but Java-defined class Serialized is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
      Serialized.`with`(
                       ^
two errors found

Kafka FAQ (из Confluent) и ответ здесь говорит, что это может произойти, если типы не определены явно в Scala, но я не вижу, где я не смог быть явным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...