Как запросить состояние хранилища в DSL Kafka Streams для реализации идемпотентности потребителя - PullRequest
0 голосов
/ 12 июня 2019

Я работаю в сценарии, когда дублированные сообщения могут поступать потребителю (приложение KStream).Чтобы использовать типичный случай, давайте предположим, что это OrderCreatedEvent и KStream имеет логику, которая обрабатывает заказ.У события есть идентификатор заказа, который поможет мне идентифицировать дублированные сообщения.

Я хочу сделать следующее:

1) Добавить каждый заказ в постоянное хранилище состояний

2) При обработке сообщения в KStream, запросите хранилище состояний, чтобы проверить, было ли уже получено сообщение, ничего не делая в этом случае.

        val persistentKeyValueStore = Stores.persistentKeyValueStore("order-store")

        val stateStore: Materialized<Int, Order, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Order>(persistentKeyValueStore)
                        .withKeySerde(intSerde)
                        .withValueSerde(orderSerde)

        val orderTable: KTable<Int, Order> = input.groupByKey(Serialized.with(intSerde, orderSerde))
                .reduce({ _, y -> y }, stateStore)

        var orderStream: KStream<Int, Order> = ...

        orderStream.filter { XXX }
                   .map { key, value -> 
                      processingLogic()
                      KeyValue(key, value)
                   }...

В бите filter { XXX } я хотел бызапросить в хранилище состояний проверить наличие идентификатора заказа (предположим, что он используется в качестве ключа хранилища ключей), отфильтровывая уже обработанные заказы (присутствующие в хранилище состояний).

Мой первый вопрос : как я могу запросить хранилище состояний в DSL KStream, например, внутри операции фильтрации.

Второй вопрос :в этом случае, как я могу обработать прибытие нового (ранее не обработанного сообщения)?Если таблица KTable сохраняет порядок в хранилище состояний ДО выполнения KStream orderStream, сообщение уже будет в хранилище.Их следует добавлять только после завершения обработки.Как я могу это сделать?Вероятно, я не должен использовать KTable для этого, но что-то вроде:

           orderStream.filter { keystore.get(key) == null }
                   .map { key, value -> 
                       processingLogic()
                       KeyValue(key, value)
                   }
                   .foreach { key, value -> 
                       keystore.put(key, value); 
                   }

1 Ответ

0 голосов
/ 26 июня 2019

Следуя указаниям Матиаса, я реализовал это так:

DeduplicationTransformer

package com.codependent.outboxpattern.operations.stream

import com.codependent.outboxpattern.account.TransferEmitted
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
import org.slf4j.LoggerFactory


@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

    private val logger = LoggerFactory.getLogger(javaClass)
    private lateinit var dedupStore: KeyValueStore<String, String>
    private lateinit var context: ProcessorContext

    override fun init(context: ProcessorContext) {
        this.context = context
        dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, String>
    }

    override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
        return if (isDuplicate(key)) {
            logger.warn("****** Detected duplicated transfer {}", key)
            null
        } else {
            logger.warn("****** Registering transfer {}", key)
            dedupStore.put(key, key)
            KeyValue(key, value)
        }
    }

    private fun isDuplicate(key: String) = dedupStore[key] != null

    override fun close() {
    }
}

FraudKafkaStreamsConfiguration

const val DEDUP_STORE = "dedup-store"

@Suppress("UNCHECKED_CAST")
@EnableBinding(TransferKafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo(value = ["outputKo", "outputOk"])
    fun process(@Input("input") input: KStream<String, TransferEmitted>): Array<KStream<String, *>>? {
        val fork: Array<KStream<String, *>> = input
                .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                .branch(Predicate { _: String, value -> fraudDetectionService.isFraudulent(value) },
                        Predicate { _: String, value -> !fraudDetectionService.isFraudulent(value) }) as Array<KStream<String, *>>
                 ...
...