Я пытаюсь реализовать простое приложение потоковой обработки в Scala, но сталкиваюсь с проблемами преобразования типов.Любые указатели высоко ценятся
Вот фрагмент кода Я пытаюсь получить компиляцию Scala для компиляции
case class ItemValue(LOCATION:String,
U_ID:String,
UOM:String,
R_ID:String,
ITEM_TYPE:String,
B_ID:String,
RECORDED_TIMESTAMP:String,
P_ID:String,
VALUE:String,
RECORDED_DTM:String,
DATA_TYPE:String)
val valueSerde : JSONSerde[ItemValue] = new JSONSerde[ItemValue]
val consumed = Consumed.`with`(Serdes.String(), valueSerde)
val items: KStream[String, ItemValue] = builder.stream("raw_topic", consumed)
.groupBy(
new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
override def apply(key: String, value: ItemValue) : KeyValue[String, ItemValue] = {
new KeyValue(value.P_ID+"_"+value.U_ID+"_"+value.R_ID+"_"+value.B_ID+"_"+value.ITEM_TYPE,value)
}
},
Serialized.`with`(
Serdes.String(),
new JSONSerde[ItemValue])
)
Вот ошибка компилятора, которую я не могу понять
type mismatch;
found : org.apache.kafka.streams.kstream.KeyValueMapper[String,com.example.streams.ItemValue,org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue]]
required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: com.example.streams.ItemValue, Object]
Note: String <: Any, but Java-defined trait KeyValueMapper is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: com.example.streams.ItemValue <: Any, but Java-defined trait KeyValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue] <: Object, but Java-defined trait KeyValueMapper is invariant in type VR.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
^
ItemStatsApp.scala:59: type mismatch;
found : org.apache.kafka.streams.kstream.Serialized[String,com.example.streams.ItemValue]
required: org.apache.kafka.streams.kstream.Serialized[Object,com.example.streams.ItemValue]
Note: String <: Object, but Java-defined class Serialized is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
Serialized.`with`(
^
two errors found
Kafka FAQ (из Confluent) и ответ здесь говорит, что это может произойти, если типы не определены явно в Scala, но я не вижу, где я не смог быть явным.