Реактивный запрос HTTP GET SpringBoot + Kafka, ожидающий обработки - PullRequest
0 голосов
/ 20 апреля 2020

Я пробую пример интеграции SpringBoot Reactive с Kafka и Angular, как упомянуто в этом уроке https://raymondhlee.wordpress.com/2019/11/23/a-reactive-stack-with-spring-boot-kafka-and-angular/

Однако ответ на вызов GET http://localhost:8181/weather никогда не возвращается из этого метода ниже. И статус остается в ожидании.

Ctrl:

private Flux<WeatherInfoEvent> bridge;

    public WeatherInfoController() {
         // (3) Broadcast to several subscribers
         this.bridge = createBridge().publish().autoConnect().cache(10).log();
    }

    // (1) Spring MVC annotation
    @GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8")
    public Flux<WeatherInfoEvent> getWeatherInfo() {
        System.out.println(" @$@$#@$ Hey i was called....");
        System.out.println(" @#$#@$# bridge.toString()"+bridge.getPrefetch() + bridge.toString());
         return bridge;
    }

    private Flux<WeatherInfoEvent> createBridge() {
         Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2)
              processor.register(new WeatherInfoEventListener() {

                  @Override
                  public void processComplete() {
                      sink.complete();
                  }

                  @Override
                  public void onData(WeatherInfoEvent data) {
                      System.out.println(" data ="+data);
                      sink.next(data);
                  }
               });
         });
         return bridge;
    }

Служба:

public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        return this.kafkaTemplate.send(topic, message);
    }

    @Scheduled(fixedDelay = 5000)
    public void getWeatherInfoJob() throws IOException {
        logger.info("generate fake weather event");
        // fake event
        WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30));
         ListenableFuture<SendResult<String, WeatherInfoEvent>> future = sendMessage("weather1", event);
         try {
            if(future.get()!=null) {
                 future.cancel(true);
             }
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (ExecutionException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
         try {
            System.out.println(" future :"+future.get().getProducerRecord().toString());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

Любая идея, где я может быть не так?

...