Создание текстового потока с помощью Spring WebFlux - PullRequest
0 голосов
/ 16 ноября 2018

Я использовал Spring WebFlux для создания текстового потока, вот код:

@SpringBootApplication
@RestController
public class ReactiveServer {

    private static final String FILE_PATH = "c:/test/";


    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/events")
    Flux<String> events() {

        Flux<String> eventFlux = Flux.fromStream(Stream.generate(() -> FileReader.readFile()));

        Flux<Long> durationFlux = Flux.interval(Duration.ofMillis(500));

        return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);

    }

    public static void main(String[] args) {

        SpringApplication.run(ReactiveServer.class, args);

    }

}

Когда я получаю доступ к URL-адресу / events в браузере, я получаю это, это почти то, чтоЯ хочу получить:

data: {"type": "HystrixCommand", "name": "GetConsumerCommand", "group": "ConsumerRemoteGroup", "currentTime": 1542379993662, "isCircuitBreakerOpen": false, "errorPercentage": 0, "errorCount": 0, "requestCount": 0, "illingCountBadRequests ": 0}

data: {" type ":" HystrixCommand "," name ":" GetConsumerCommand"," group ":" ConsumerRemoteGroup "," currentTime ": 1542379994203," isCircuitBreakerOpen ": false," errorPercentage ": 0," errorCount ": 0," requestCount ": 2, RollingCountBadRequests": 0}

данные: { "Тип": "HystrixCommand", "имя": "GetConsumerCommand", "группа": "ConsumerRemoteGroup", "CURRENTTIME": 1542379994706, "isCircuitBreakerOpen" ложь "errorPercentage": 0, "ERRORCOUNT": 0, "requestCount": 2, "rollCountBadRequests": 0}

data: {"type": "HystrixCommand", "name": "GetConsumerCommand", "group": "ConsumerRemoteGroup "," currentTime ": 1542379995213," isCircuitBreakerOpen ": false," errorPercentage ": 0," errorCount ": 0," requestCount ": 3, RollingCountBadRequests: 0}

Что янеобходимо вставить «ping:» между итерациями, чтобы получить:

ping:

data: {"type": "HystrixCommand", "name": "GetConsumerCommand», "группа": "ConsumerRemoteGroup", "CURRENTTIME": 1542379993662, "isCircuitBreakerOpen" ложь "errorPercentage": 0, "ERRORCOUNT": 0, "requestCount": 0, "rollingCountBadRequests": 0}

данные: { "Тип": "HystrixCommand", "имя": "GetConsumerCommand", "группа": "ConsumerRemoteGroup", "CURRENTTIME": 1542379994203, "isCircuitBreakerOpen" ложь "errorPercentage": 0, "ERRORCOUNT": 0," requestCount ": 2," rollCountBadRequests ": 0}

ping:

data: {" type ":" HystrixCommand "," name ":" GetConsumerCommand ","группа ":" ConsumerRemoteGroup», "CURRENTTIME": 1542379994706, "isCircuitBreakerOpen" ложь "errorPercentage": 0, "ErrorCount": 0, "requestCount": 2, "rollingCountBadRequests": 0}

data: {"type": "HystrixCommand", "name": "GetConsumerCommand", "group": "ConsumerRemoteGroup", "currentTime": 1542379995213, "isCircuitBreakerOpen": false, "errorPercentage": 0,«errorCount»: 0, «requestCount»: 3, RollingCountBadRequests: 0}

Но лучшее, что я мог получить, было:

data: ping:

data: {"type": "HystrixCommand", "name": "GetConsumerCommand", "group": "ConsumerRemoteGroup", "currentTime": 1542379993662, "isCircuitBreakerOpen": false, "errorPercentage": 0,«errorCount»: 0, «requestCount»: 0, RollingCountBadRequests: 0}

data: {«type»: «HystrixCommand», «name»: «GetConsumerCommand», «group»: «ConsumerRemoteGroup», "currentTime": 1542379994203, "isCircuitBreakerOpen": false, "errorPercentage": 0, "errorCount": 0, "requestCount": 2, "illingCountBadRequests ": 0}

данные: пинг:

данные: { "Тип": "HystrixCommand", "имя": "GetConsumerCommand", "группа": "ConsumerRemoteGroup", "CURRENTTIME": 1542379994706, "isCircuitBreakerOpen" ложь "errorPercentage": 0, "errorCount ": 0," requestCount ": 2, RollingCountBadRequests: 0}

data: {" type ":" HystrixCommand "," name ":" GetConsumerCommand "," group ":" ConsumerRemoteGroup ",«currentTime»: 1542379995213, «isCircuitBreakerOpen»: false, «errorPercentage»: 0, «errorCount»: 0, «requestCount»: 3, RollingCountBadRequests »: 0}

Кто-нибудь знает опуть к тому, что мне нужно?

1 Ответ

0 голосов
/ 18 ноября 2018

Вы можете попытаться вернуть Flux<ServerSentEvent> и указать тип события, которое вы пытаетесь отправить. Как это:

@RestController
public class TestController {

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, path = "/events")
    Flux<ServerSentEvent> events() {
        Flux<String> events = Flux.interval(Duration.ofMillis(200)).map(String::valueOf);
        Flux<ServerSentEvent<String>> sseData = events.map(event -> ServerSentEvent.builder(event).build());
        Flux<ServerSentEvent<String>> ping = Flux.interval(Duration.ofMillis(500))
                .map(l -> ServerSentEvent.builder("").event("ping").build());
        return Flux.merge(sseData, ping);
    }
}

С этим фрагментом кода я получаю следующий вывод:

$ http --stream :8080/events
HTTP/1.1 200 OK
Content-Type: text/event-stream;charset=UTF-8
transfer-encoding: chunked

data:0

data:1

event:ping
data:

data:2

data:3

data:4

event:ping
data:

Что соответствует Отправленные сервером события . Префикс ping: специфичен для Hystrix? Если это так, я не думаю, что это согласуется со спецификацией SSE и что она поддерживается в Spring Framework.

...