KStream объект, когда Autowired не печатает значение - PullRequest
0 голосов
/ 18 сентября 2018

Я пытаюсь создать bean-компонент kstreams и автоматически связать его с моим сервисом. Но даже если я получаю тот же объект, stream.print () не дает никакого значения, но печать внутри того же компонента работает. Я думаю, что я не получаю Same StreamBuilder с конфигом.

Файл конфигурации

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Autowired private KafkaProperties kafkaProperties;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
        props.put(JsonDeserializer.DEFAULT_VALUE_TYPE, String.class);
        return new StreamsConfig(props);
    }

    @Bean
    public KStream<String, String> kStreamJson(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("topictest", Consumed.with(Serdes.String(), Serdes.String()));
        //stream.print();
        return stream;
    }

}

Услуги

Функция печати здесь не выдает никаких ошибок и не печатает никаких значений

@Service
public class KStreamsService {

    @Autowired
    KStream<String, String> kStream;

    void process() {
        System.out.println("Hai");
        kStream.print();
    }
}

Главная

@SpringBootApplication
public class KStreamsApplication {

    @Autowired
    KStreamsService kStreamsService;

    public static void main(String[] args) {
        SpringApplication.run(KStreamsApplication.class, args);
    }

    private void run() {

        kStreamsService.process();

    }
}

Я что-то здесь не так делаю?

1 Ответ

0 голосов
/ 18 сентября 2018

Непонятно, откуда вы вызываете этот run() метод в приложении.

Однако уже слишком поздно вызывать stream.print() в службе, поскольку к этому времени потоки уже были запущены.

Это работает для меня ...

@SpringBootApplication
public class KStreamsApplication {

    @Autowired
    KStreamsService kStreamsService;

    public static void main(String[] args) {
        SpringApplication.run(KStreamsApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(StreamsBuilderFactoryBean fb) {
        fb.setAutoStartup(false);
        return args -> {
            run();
            fb.start();
        };
    }

    private void run() {

        kStreamsService.process();

    }

}

и

[KSTREAM-SOURCE-0000000000]: null, foo
...