При использовании kafka-streams-scala, портирования lightbend kafka-streams-scala, в Scala 2.12 мы сталкиваемся со странной проблемой с компилятором, который не может правильно обрабатывать вызов перегруженного метода
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KTable
object FailingExample {
private val builder = new StreamsBuilder()
class MyClass
class MyClass2
class MyClass3
implicit val consumedMyClass1: Consumed[String,MyClass] = null
implicit val consumedMyClass2: Consumed[String,MyClass2] = null
val table:KTable[String,MyClass] = builder.table[String,MyClass]("myTopic1")
val table1:KTable[String,MyClass2] = builder.table[String,MyClass2]("myTopic1")
table.join(table1)(
(myClass:MyClass, myClass2:MyClass2) => new MyClass3
)
Компилятор завершается ошибкой с
[error] /Users/edmondoporcu1/Development/credimi/user-preferences/user-preferences-salesforce-updater/src/main/scala/com/credimi/kafka/FailingExample.scala:25:9: ambiguous reference to overloaded definition,
[error] both method join in class KTable of type [VO, VR](other: org.apache.kafka.streams.scala.kstream.KTable[String,VO])(joiner: (com.credimi.kafka.FailingExample.MyClass, VO) => VR, materialized: org.apache.kafka.streams.kstream.Materialized[String,VR,org.apache.kafka.streams.scala.ByteArrayKeyValueStore])org.apache.kafka.streams.scala.kstream.KTable[String,VR]
[error] and method join in class KTable of type [VO, VR](other: org.apache.kafka.streams.scala.kstream.KTable[String,VO])(joiner: (com.credimi.kafka.FailingExample.MyClass, VO) => VR)org.apache.kafka.streams.scala.kstream.KTable[String,VR]
[error] match argument types (org.apache.kafka.streams.scala.kstream.KTable[String,com.credimi.kafka.FailingExample.MyClass2])
[error] table.join(table1)(
Однако это не имеет особого смысла, поскольку мы передаем только аргумент в списке второго аргумента, и поэтому компилятор не должен иметь никакой двусмысленности.Мы включили для справки два метода в org.apache.kafka.streams.scala.kstream.Ktable:
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
*
* @param other the other [[KTable]] to be joined with this [[KTable]]
* @param joiner a function that computes the join result for a pair of matching records
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#join`
*/
def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
inner.join[VO, VR](other.inner, joiner.asValueJoiner)
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
*
* @param other the other [[KTable]] to be joined with this [[KTable]]
* @param joiner a function that computes the join result for a pair of matching records
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#join`
*/
def join[VO, VR](other: KTable[K, VO])(
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)