Я использую Kafka Streams 1.1.0.
Я создал следующую топологию:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
--> KTABLE-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000001
Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
--> none
<-- KTABLE-SOURCE-0000000002
Код выглядит следующим образом:
case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
.someAdditionalTransformation
.mapValues[Test](
new ValueMapperWithKey[String, String, Test] {
override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
}, mal)
IЯ хотел бы создать хранилище с запросами, которое можно будет использовать для последующего запроса (получения отфильтрованных / преобразованных значений).
Я запустил простой тест с использованием TopologyTestDriver
, и выдается следующее исключение:
Причина: java.lang.ClassCastException: com.example.kafka.streams.topology.Test не может быть преобразован в java.lang.String в org.apache.kafka.common.serialization.StringSerializer.serialize.(StringSerializer.java:28) в org.apache.kafka.streams.state.StateSerdes.rawValue (StateSerdes.java:178) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredBtesStore)66) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredKeyValueBytesStore.java:57) в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueMueee.организация.apache.kafka.streams.state.internals..kafka.streams.kstream.internals.KTableMapValues $ KTableMapValuesProcessor.process (KTableMapValues.java:83) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) илиkafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apacheka.ka.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174) по адресу org.apache.kafka.streams.kstream.internals.KTableFilter $ KTableFilterProcessor.process (KTableFilter.java:89) по адресу org.apache.kafka.internals.KTableFilter $ KTableFilterProcessor.process (KTableFilter.java:63) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174) в org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.(CachingKeyValueStore.java:38) в org.apache.kafka.streams.state.internals.CachingKeyValueStore $ 1.apply (CachingKeyValueStore.java:83) в org.apache.kafka.streams.state.internals.NamedCache.flush (NameCache.flush.Java: 142) в org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:100) в org.apache.kafka.streams.state.internals.ThreadCache.flush (ThreadCache.java:127) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush (CachingKeyValueStore.java:123) в орг.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush (InnerMeteredKeyValueStore.java:267) по адресу org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush (MeteredKey14ap.ka.a.ka.a.j.j.j.j.au.j.b.y.j.au.j.streams.processor.internals.ProcessorStateManager.flush (ProcessorStateManager.java:244) ... еще 58
Есть идеи, почему и как это исправить?