Как игнорировать некоторые виды сообщений в приложении Kafka Streams, которое читает и записывает различные типы событий из одной темы - PullRequest
0 голосов
/ 17 апреля 2019

Предположим, что приложение Spring Cloud Stream создает KStream из order topic.Интересует OrderCreated {"id":x, "productId": y, "customerId": z} события.Как только кто-то приходит, он обрабатывает его и генерирует выходное событие OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} для того же order topic.

Проблема, с которой я сталкиваюсь, заключается в том, что, поскольку он читает и записывает из / в одну и ту же тему, поток Кафкиприложение пытается обработать свои собственные записи, что не имеет смысла.

Как я могу запретить этому приложению обрабатывать события, которые оно генерирует?

ОБНОВЛЕНИЕ: КакАртем Билан и собачако отмечают, что я подумывал об использовании KStream.filter(), но есть некоторые детали, которые заставляют меня усомниться в том, как с этим справиться:

Сейчас приложение KStream выглядит так:

interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>

Конфигурация KStream

    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

Обе привязки заказа и вывода указывают на тему заказа как пункт назначения.

Класс OrderCreated:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}

Класс OrderShipped

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}

Я использую JSON в качестве формата сообщения, поэтому сообщения выглядят так:

  • INPUT - OrderCreated: {"id":1, "productId": 7,"customerId": 20}
  • OUTPUT - OrderShipped: {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}

Я ищу лучший способ отфильтровать нежелательные сообщения , учитывая это:

Если я просто использую KStream.filter() прямо сейчас, когда я получу {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}, мой KStream<Int, OrderCreated> разархивирует событие OrderShipped как объект OrderCreated с некоторыми пустыми полями: OrderCreated(id:1, productId: 7, customerId: null).Проверка пустых полей не выглядит надежной.

A возможным решением может быть добавление другого поля, eventType = OrderCreated|OrderShipped, для каждого типа сообщения / класса, использующего эту тему.Даже в этом случае у меня был бы класс OrderCreated (помните KStream ) с атрибутом eventType = OrderShipped. Это выглядит как уродливый обходной путь .Любая идея улучшить это?

Есть ли другой, более автоматический способ справиться с этим?Например, может ли другой вид сериализации ( AVRO ?) Предотвратить обработку сообщений, если они не соответствуют ожидаемой схеме (OrderCreated)?Этот способ поддержки нескольких схем (типов событий) в одной и той же теме, по-видимому, является хорошей практикой в ​​соответствии с этой статьей: https://www.confluent.io/blog/put-several-event-types-kafka-topic/ Однако не ясно, как демонтировать / десериализовать различные типы.

Ответы [ 2 ]

1 голос
/ 21 апреля 2019

Я принял ответ Бруно как верный способ решить эту проблему.Однако я думаю, что я придумал более простой / логичный способ, используя иерархию событий, аннотированных JsonTypeInfo.

Сначала вам нужен базовый класс для событий Order и укажите все подклассы.Обратите внимание, что в JSON-документе будет добавлено свойство типа, которое поможет Джексону маршалировать / демаршировать DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
    constructor() : this(null, null, null)
}

data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
    constructor() : this(null, null, null, null)
}

При этом производитель объектов OrderCreatedEvent сгенерирует следующее сообщение:

key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}

Теперь очередь за KStream.Я изменил подпись на KStream<Int, OrderEvent>, поскольку она может получать OrderCreatedEvent или OrderShippedEvent.В следующих двух строках ...

orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

... Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их для преобразования KStream<Int, OrderEvent> в KStream<Int, OrderCreatedEvent>

Полная логика KStream:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)


        return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    }

После этого процесса я создаю новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"} в теме заказа, но оно будет отфильтровано потоком.

0 голосов
/ 20 апреля 2019

Вы можете использовать заголовки записей Kafka для хранения типа записи. См. KIP-82 . Вы можете установить заголовки в ProducerRecord.

Обработка будет выглядеть следующим образом:

  1. Прочитайте stream типа KStream<Integer, Bytes> со значением serde Serdes.BytesSerde из темы.
  2. Используйте KStream#transformValues() для фильтрации и создания объектов. Более конкретно, в transformValues() вы можете получить доступ к ProcessorContext, который дает вам доступ к заголовкам записей, которые содержат информацию о типе записи. Тогда:

    • Если тип OrderShipped, вернуть null.
    • В противном случае создайте объект OrderCreated из объекта Bytes и верните его.

Для решения с AVRO вы можете взглянуть на следующие документы

...