У меня сервер Jersey, работающий локально, он предоставляет ресурс SSE, как показано здесь: https://jersey.github.io/documentation/latest/sse.html. У меня есть локальное веб-приложение Angular, которое связывается с открытой конечной точкой GET и прослушивает данные.
На GET я запускаю поток для отправки уведомлений через равные промежутки времени в течение 6-8 секунд. Я ничего не вижу на клиенте, пока объект EventOutput не закрыт.
Что я делаю не так и как я могу это исправить?
Серверный код работает с простым завитком, т. Е .:
curl http://localhost:8002/api/v1/notify
Но и в Chrome, и в Safari следующий код демонстрирует поведение
Клиент (TypeScript):
this.evSource = new EventSource('http://localhost:8002/api/v1/notify');
this.evSource.addEventListener(
'event',
(x => console.log('we have ', x))
);
this.evSource.onmessage = (data => console.log(data));
this.evSource.onopen = (data => console.log(data));
this.evSource.onerror = (data => {
console.log(data);
this.evSource.close();
});
Сервер (Java):
// cache callback
public void eventCallback(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> events) {
for (CacheEntryEvent<? extends Integer, ? extends Integer> x : events) {
LOGGER.info("{} Sending the following value: " + x.getValue(), Thread.currentThread().getId());
final OutboundEvent sseEvent = new OutboundEvent.Builder().name("event")
.data(Integer.class, x.getValue()).build();
this.broadcaster.broadcast(sseEvent);
}
}
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
@ApiOperation(value = "Setup SSE pipeline", notes = "Sets up the notification pipeline for clients to access")
@ApiResponses(value = {
@ApiResponse(code = HttpURLConnection.HTTP_UNAUTHORIZED,
message = "Missing, bad or untrusted cookie"),
@ApiResponse(code = HttpURLConnection.HTTP_OK,
message = "Events streamed successfully")
})
@Timed
@ResponseMetered
public EventOutput registerNotificationEvents(
@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) String lastEventId,
@QueryParam(SseFeature.LAST_EVENT_ID_HEADER) String lastEventIdQuery) {
if (!Strings.isNullOrEmpty(lastEventId) || !Strings.isNullOrEmpty(lastEventIdQuery)) {
LOGGER.info("Found Last-Event-ID header: {}", !Strings.isNullOrEmpty(lastEventId) ? lastEventId : lastEventIdQuery );
}
LOGGER.info("{} Received request", Thread.currentThread().getId());
this.continuation = true;
final EventOutput output = new EventOutput();
broadcaster.add(output);
Random rand = new Random();
IntStream rndStream = IntStream.generate(() -> rand.nextInt(90));
List<Integer> lottery = rndStream.limit(15).boxed().collect(Collectors.toList());
IgniteCache<Integer, Integer> cache = this.ignite.cache(topic_name);
executorService.execute(() -> {
try {
lottery.forEach(value -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
LOGGER.info("{} Sending the following value to Ignite: " + value + " : " + count++, Thread.currentThread().getId());
if (!cache.isClosed()) {
cache.put(1, value);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
TimeUnit.MILLISECONDS.sleep(500);
continuation = false;
TimeUnit.MILLISECONDS.sleep(500);
if (!output.isClosed()) {
// THIS is where the client sees ALL the data broadcast
// in one shot
output.close();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
});
LOGGER.info("{} Completing request", Thread.currentThread().getId());
return output;
}
}