Я пытаюсь создать bean-компонент kstreams и автоматически связать его с моим сервисом. Но даже если я получаю тот же объект, stream.print () не дает никакого значения, но печать внутри того же компонента работает. Я думаю, что я не получаю Same StreamBuilder с конфигом.
Файл конфигурации
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Autowired private KafkaProperties kafkaProperties;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
props.put(JsonDeserializer.DEFAULT_VALUE_TYPE, String.class);
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("topictest", Consumed.with(Serdes.String(), Serdes.String()));
//stream.print();
return stream;
}
}
Услуги
Функция печати здесь не выдает никаких ошибок и не печатает никаких значений
@Service
public class KStreamsService {
@Autowired
KStream<String, String> kStream;
void process() {
System.out.println("Hai");
kStream.print();
}
}
Главная
@SpringBootApplication
public class KStreamsApplication {
@Autowired
KStreamsService kStreamsService;
public static void main(String[] args) {
SpringApplication.run(KStreamsApplication.class, args);
}
private void run() {
kStreamsService.process();
}
}
Я что-то здесь не так делаю?