Почему хранилище состояний не работает с проблемой сериализации? - PullRequest
0 голосов
/ 27 декабря 2018

Я использую Kafka Streams 1.1.0.

Я создал следующую топологию:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
      --> KTABLE-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000001
    Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
      --> none
      <-- KTABLE-SOURCE-0000000002

Код выглядит следующим образом:

case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
  .someAdditionalTransformation
  .mapValues[Test](
      new ValueMapperWithKey[String, String, Test] {
         override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
      }, mal)

IЯ хотел бы создать хранилище с запросами, которое можно будет использовать для последующего запроса (получения отфильтрованных / преобразованных значений).

Я запустил простой тест с использованием TopologyTestDriver, и выдается следующее исключение:

Причина: java.lang.ClassCastException: com.example.kafka.streams.topology.Test не может быть преобразован в java.lang.String в org.apache.kafka.common.serialization.StringSerializer.serialize.(StringSerializer.java:28) в org.apache.kafka.streams.state.StateSerdes.rawValue (StateSerdes.java:178) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredBtesStore)66) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredKeyValueBytesStore.java:57) в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueMueee.организация.apache.kafka.streams.state.internals..kafka.streams.kstream.internals.KTableMapValues ​​$ KTableMapValuesProcessor.process (KTableMapValues.java:83) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) илиkafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apacheka.ka.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174) по адресу org.apache.kafka.streams.kstream.internals.KTableFilter $ KTableFilterProcessor.process (KTableFilter.java:89) по адресу org.apache.kafka.internals.KTableFilter $ KTableFilterProcessor.process (KTableFilter.java:63) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174) в org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.(CachingKeyValueStore.java:38) в org.apache.kafka.streams.state.internals.CachingKeyValueStore $ 1.apply (CachingKeyValueStore.java:83) в org.apache.kafka.streams.state.internals.NamedCache.flush (NameCache.flush.Java: 142) в org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:100) в org.apache.kafka.streams.state.internals.ThreadCache.flush (ThreadCache.java:127) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush (CachingKeyValueStore.java:123) в орг.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush (InnerMeteredKeyValueStore.java:267) по адресу org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush (MeteredKey14ap.ka.a.ka.a.j.j.j.j.au.j.b.y.j.au.j.streams.processor.internals.ProcessorStateManager.flush (ProcessorStateManager.java:244) ... еще 58

Есть идеи, почему и как это исправить?

1 Ответ

0 голосов
/ 28 декабря 2018

После некоторого расследования я обнаружил причину вышеуказанного исключения.

Я создал Materialized для хранения данных, но я не передал Serdes для ключа или значения.

ЕслиВы не передаете, используются значения по умолчанию.В моем случае это было StringSerializer, и я пытался сериализовать объект класса Test с помощью StringSerializer mea culpa

Для прохождения Serdes .withValueSerde(GenericSerde[Test]) требуется толькобыть добавлен, где GenericSerdes является реализацией org.apache.kafka.common.serialization.Serde

val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
    .withValueSerde(GenericSerde[Test])
...