SSE сервер отправляет события в пакете при финальном закрытии - PullRequest
0 голосов
/ 26 октября 2018

У меня сервер Jersey, работающий локально, он предоставляет ресурс SSE, как показано здесь: https://jersey.github.io/documentation/latest/sse.html. У меня есть локальное веб-приложение Angular, которое связывается с открытой конечной точкой GET и прослушивает данные. На GET я запускаю поток для отправки уведомлений через равные промежутки времени в течение 6-8 секунд. Я ничего не вижу на клиенте, пока объект EventOutput не закрыт.

Что я делаю не так и как я могу это исправить?

Серверный код работает с простым завитком, т. Е .: curl http://localhost:8002/api/v1/notify

Но и в Chrome, и в Safari следующий код демонстрирует поведение

Клиент (TypeScript):

this.evSource = new EventSource('http://localhost:8002/api/v1/notify');
this.evSource.addEventListener(
  'event',
  (x => console.log('we have ', x))
);
this.evSource.onmessage = (data => console.log(data));
this.evSource.onopen = (data => console.log(data));
this.evSource.onerror = (data => {
  console.log(data);
  this.evSource.close();
});

Сервер (Java):

// cache callback
public void eventCallback(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> events) {
    for (CacheEntryEvent<? extends Integer, ? extends Integer> x : events) {
        LOGGER.info("{} Sending the following value: " + x.getValue(), Thread.currentThread().getId());
        final OutboundEvent sseEvent = new OutboundEvent.Builder().name("event")
                .data(Integer.class, x.getValue()).build();
        this.broadcaster.broadcast(sseEvent);
    }
}

@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
@ApiOperation(value = "Setup SSE pipeline", notes = "Sets up the notification pipeline for clients to access")
@ApiResponses(value = {
        @ApiResponse(code = HttpURLConnection.HTTP_UNAUTHORIZED,
                message = "Missing, bad or untrusted cookie"),
        @ApiResponse(code = HttpURLConnection.HTTP_OK,
                message = "Events streamed successfully")
})
@Timed
@ResponseMetered
public EventOutput registerNotificationEvents(
        @HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) String lastEventId,
        @QueryParam(SseFeature.LAST_EVENT_ID_HEADER) String lastEventIdQuery) {
    if (!Strings.isNullOrEmpty(lastEventId) || !Strings.isNullOrEmpty(lastEventIdQuery)) {
        LOGGER.info("Found Last-Event-ID header: {}", !Strings.isNullOrEmpty(lastEventId) ? lastEventId : lastEventIdQuery );
    }
    LOGGER.info("{} Received request", Thread.currentThread().getId());
    this.continuation = true;
    final EventOutput output = new EventOutput();

    broadcaster.add(output);

    Random rand = new Random();
    IntStream rndStream = IntStream.generate(() -> rand.nextInt(90));
    List<Integer> lottery = rndStream.limit(15).boxed().collect(Collectors.toList());
    IgniteCache<Integer, Integer> cache = this.ignite.cache(topic_name);

    executorService.execute(() -> {
        try {
            lottery.forEach(value -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                    LOGGER.info("{} Sending the following value to Ignite: " + value + " : " + count++, Thread.currentThread().getId());
                    if (!cache.isClosed()) {
                        cache.put(1, value);
                    }
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });
            TimeUnit.MILLISECONDS.sleep(500);
            continuation = false;
            TimeUnit.MILLISECONDS.sleep(500);
            if (!output.isClosed()) {
                // THIS is where the client sees ALL the data broadcast
                // in one shot
                output.close();
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    });
    LOGGER.info("{} Completing request", Thread.currentThread().getId());
    return output;
    }
}

1 Ответ

0 голосов
/ 31 октября 2018

Похоже, http://github.com/dropwizard/dropwizard/issues/1673 фиксирует проблему. GZip default не будет сбрасываться, даже если об этом попросят верхние уровни. Решение это что-то вроде

((AbstractServerFactory)configuration.getServerFactory()).getGzipFilterFactory().setSyncFlush(true);

включит сброс для синхронизации с GZip, если отключение GZip all up не является опцией

...