Собственный API Kafka позволяет создавать и добавлять хранилище состояний с помощью StreamsBuilder :
final StreamsBuilder builder = new StreamsBuilder();
...
final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
windowSize,
false
),
Serdes.String(),
Serdes.Long());
builder.addStateStore(dedupStoreBuilder);
Я хотел бы сделать то же самое с помощью Spring Cloud Streams, но не могувыяснить способ доступа к StreamsBuilder
для добавления магазина.
Я пытался получить StreamsBuilderFactoryBean
, как указано в doc , надеясь, что я смогу получить StreamsBuilder
объект от него, но бин, по-видимому, недоступен:
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
...
return xxx
}
}
Причина: org.springframework.beans.factory.NoSuchBeanDefinitionException: ни один бин с именем stream-builder-процесс 'доступен
В любом случае, я даже не уверен, что это правильный способ сделать это.Итак, как мы можем программно создать StateStore
?