Это мое приложение, которое просто получает ссылку на KStream из темы клиента (привязка ввода) и другую ссылку из темы заказа (привязка заказа).Затем он создает KTable из темы клиента и выполняет объединение с порядком KStream:
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") order: KStream<Int, Order>): KStream<Int, OrderShipped> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderSerde = JsonSerde<Order>(Order::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (order.selectKey { key, value -> value.customerId } as KStream<Int, Order>)
.join(customerTable, { orderIt, customer ->
OrderShipped(orderIt.id)
},
Joined.with(intSerde, orderSerde, customerSerde))
}
}
Предположительно, это должно быть запись в выходную привязку (@SendTo("output")
), указывающую на тему заказа на поставку.Однако сообщения на эту тему не пишутся.
Конфигурация процессора:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Input("output")
fun output(): KStream<String, OrderShipped>
}
**application.yml**
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
destination: customer
contentType: application/json
order:
destination: order
contentType: application/json
output:
destination: ordershipments
contentType: application/json