Как добавить StateStore с помощью StateStoreBuilder в приложении Spring Cloud Stream Kafka Streams - PullRequest
0 голосов
/ 17 июня 2019

Собственный API Kafka позволяет создавать и добавлять хранилище состояний с помощью StreamsBuilder :

    final StreamsBuilder builder = new StreamsBuilder();
    ...
    final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(storeName,
                                         retentionPeriod,
                                         windowSize,
                                         false
            ),
            Serdes.String(),
            Serdes.Long());

    builder.addStateStore(dedupStoreBuilder);

Я хотел бы сделать то же самое с помощью Spring Cloud Streams, но не могувыяснить способ доступа к StreamsBuilder для добавления магазина.

Я пытался получить StreamsBuilderFactoryBean, как указано в doc , надеясь, что я смогу получить StreamsBuilder объект от него, но бин, по-видимому, недоступен:

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {

        val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
        ...
        return xxx

    }

}

Причина: org.springframework.beans.factory.NoSuchBeanDefinitionException: ни один бин с именем stream-builder-процесс 'доступен

В любом случае, я даже не уверен, что это правильный способ сделать это.Итак, как мы можем программно создать StateStore?

1 Ответ

1 голос
/ 17 июня 2019

Я не видел документированной процедуры из-за моей версии Scs (Fishtown SR3), но хорошая новость в том, что возможно создать State Store декларативно, начиная с Germantown:

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)

    }

}
...