Я принял ответ Бруно как верный способ решить эту проблему.Однако я думаю, что я придумал более простой / логичный способ, используя иерархию событий, аннотированных JsonTypeInfo
.
Сначала вам нужен базовый класс для событий Order и укажите все подклассы.Обратите внимание, что в JSON-документе будет добавлено свойство типа, которое поможет Джексону маршалировать / демаршировать DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
При этом производитель объектов OrderCreatedEvent сгенерирует следующее сообщение:
key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}
Теперь очередь за KStream.Я изменил подпись на KStream<Int, OrderEvent>
, поскольку она может получать OrderCreatedEvent или OrderShippedEvent.В следующих двух строках ...
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
... Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их для преобразования KStream<Int, OrderEvent>
в KStream<Int, OrderCreatedEvent>
Полная логика KStream:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
После этого процесса я создаю новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"}
в теме заказа, но оно будет отфильтровано потоком.