KTable не возвращает данные в приложении Spring Boot, однако его можно запросить - PullRequest
0 голосов
/ 06 сентября 2018

У меня есть приложение Spring Boot, работающее с Kafka Streams. У меня есть KTable с некоторыми котировками финансовой валюты, который создается следующим образом:

@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(quoteTopicName,
            Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(Quote.class)));
}

Я @Autowire этот компонент в другом компоненте, и проверить его с помощью следующего кода:

@Autowired
private KTable<String, Quote> indicativeQuotes;

@PostConstruct
private void postConstruct() {
    doPrint();
}

public void doPrint() {
        ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
        store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
        indicativeQuotes.foreach((k,v) -> log.info(k));}

Код регистрирует правильные значения при запросах через store, но ничего не выводит в foreach (), как если бы таблица была пустой. Я также пробовал print () и другие параметры - все ничего не выводят без каких-либо исключений.

Я начинаю думать, что я не могу так вводить компоненты KTable, но документации Spring по теме потоков kafka довольно мало, и я не могу найти хороших примеров. Любая помощь будет оценена.

Обновление.

Мой пример использования: у меня запланированное задание Quartz, которое должно записывать текущее состояние KTable в тему Kafka при запуске, как показано ниже:

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
    log.info("Job was triggered by: {}", triggerKey.getName());

    indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
            .mapValues(quoteToCourseFixedMapper)
            .toStream()
            .peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
            .to(quoteEventTopicName);
}

Но я думаю, что этот код не работает, потому что он не является частью топологии обработки, и я не могу просто взять данные из Ktable по требованию. Я немного озадачен, конечно, я могу запрашивать данные через хранилище, когда происходит событие, но, может быть, есть лучший шаблон для такого варианта использования? По сути, мне интересно, возможно ли включить эти инициируемые события задания как часть конвейера обработки.

...