Как перенести KTable-KTable leftJoin с явным хранилищем состояний из Kafka 0.11 в Kafka 2.0? - PullRequest
0 голосов
/ 28 ноября 2018

Мы столкнулись с проблемой из-за изменения API с Kafka 0.11 на Kafka 2.0.В нашем приложении потоков Кафки на основе 0,11 у нас было объединение двух KTables[String,Something], которые использовали именованное хранилище состояний:

val joinedTable = {
    myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
        MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}

Однако при переходе на 2.0 единственный способ явно предоставить хранилище состоянийвыглядит следующим образом:

val joinedTable = {
    val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
    myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
        MyJoiner,materialized)
}

С этим кодом замена экземпляра приложения в производственной среде завершается неудачно, потому что хранилище состояний в Kafka 0.11, вероятно, использовало ключевую серию как myTable1, так и myTable2.

org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)

Помимо выполнения kafka-streams-application-reset, существуют ли другие, может быть, лучшие способы решения этой проблемы?

1 Ответ

0 голосов
/ 29 ноября 2018

Вы можете передать ключ key явно:

val joinedTable = {
  val materialized = Materialized
    .as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store")
    .withKeySerde(new StringSerde())
    .withValueSerde(new Serde[MyClass1Class2])

  myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table, new MyJoiner, materialized)
}
...