Мы столкнулись с проблемой из-за изменения API с Kafka 0.11 на Kafka 2.0.В нашем приложении потоков Кафки на основе 0,11 у нас было объединение двух KTables[String,Something]
, которые использовали именованное хранилище состояний:
val joinedTable = {
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}
Однако при переходе на 2.0 единственный способ явно предоставить хранилище состоянийвыглядит следующим образом:
val joinedTable = {
val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner,materialized)
}
С этим кодом замена экземпляра приложения в производственной среде завершается неудачно, потому что хранилище состояний в Kafka 0.11, вероятно, использовало ключевую серию как myTable1, так и myTable2.
org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
Помимо выполнения kafka-streams-application-reset, существуют ли другие, может быть, лучшие способы решения этой проблемы?