Я пробую пример интеграции SpringBoot Reactive с Kafka и Angular, как упомянуто в этом уроке https://raymondhlee.wordpress.com/2019/11/23/a-reactive-stack-with-spring-boot-kafka-and-angular/
Однако ответ на вызов GET http://localhost:8181/weather
никогда не возвращается из этого метода ниже. И статус остается в ожидании.
Ctrl:
private Flux<WeatherInfoEvent> bridge;
public WeatherInfoController() {
// (3) Broadcast to several subscribers
this.bridge = createBridge().publish().autoConnect().cache(10).log();
}
// (1) Spring MVC annotation
@GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8")
public Flux<WeatherInfoEvent> getWeatherInfo() {
System.out.println(" @$@$#@$ Hey i was called....");
System.out.println(" @#$#@$# bridge.toString()"+bridge.getPrefetch() + bridge.toString());
return bridge;
}
private Flux<WeatherInfoEvent> createBridge() {
Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2)
processor.register(new WeatherInfoEventListener() {
@Override
public void processComplete() {
sink.complete();
}
@Override
public void onData(WeatherInfoEvent data) {
System.out.println(" data ="+data);
sink.next(data);
}
});
});
return bridge;
}
Служба:
public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) {
logger.info(String.format("#### -> Producing message -> %s", message));
return this.kafkaTemplate.send(topic, message);
}
@Scheduled(fixedDelay = 5000)
public void getWeatherInfoJob() throws IOException {
logger.info("generate fake weather event");
// fake event
WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30));
ListenableFuture<SendResult<String, WeatherInfoEvent>> future = sendMessage("weather1", event);
try {
if(future.get()!=null) {
future.cancel(true);
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (ExecutionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
System.out.println(" future :"+future.get().getProducerRecord().toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Любая идея, где я может быть не так?