Элементы эмиссии потока на основе свойства даты и времени - PullRequest
1 голос
/ 28 сентября 2019

У меня есть сценарий, в котором я хочу обрабатывать события на основе свойства даты и времени в событии.

Моя неудачная попытка на данный момент:

class Data {
    private Integer id;
    private LocalDateTime sendAt;
}

List<Data> data = Arrays.asList(
        new Data(1, LocalDateTime.now().plusSeconds(1)),
        new Data(2, LocalDateTime.now().plusSeconds(2)),
        new Data(3, LocalDateTime.now().plusSeconds(3)),
        new Data(4, LocalDateTime.now().plusSeconds(5)),
        new Data(5, LocalDateTime.now().plusSeconds(8)),
        new Data(6, LocalDateTime.now().plusSeconds(13)),
        new Data(7, LocalDateTime.now().plusSeconds(21)),
        new Data(8, LocalDateTime.now().plusSeconds(34)),
        new Data(9, LocalDateTime.now().plusSeconds(55)));

Flux<Data> dataFlux = Flux.fromIterable(data);

dataFlux.takeWhile(d -> d.sendAt.isAfter( LocalDateTime.now() ))
        .subscribe(x -> System.out.println(x));

Я ожидаю событиядля печати через 1, 2, 3, 5, 8, 13, 21, ... секунд каждая.

Возможно ли это с помощью Spring WebFlux / Reactor?

1 Ответ

1 голос
/ 29 сентября 2019

Это возможно путем объединения delayUntil с Mono.delay(<custom_time>) и потоковой передачи HTTP с MediaType.APPLICATION_STREAM_JSON_VALUE или SSE MediaType.TEXT_EVENT_STREAM_VALUE:

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

@RestController
public class FluxDateTime {
    @GetMapping(value = "/time", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Data> getData() {
        List<Data> data = Arrays.asList(
                new Data(1, 1),
                new Data(2, 2),
                new Data(3, 3),
                new Data(4, 5),
                new Data(5, 8),
                new Data(6, 13),
                new Data(7, 21),
                new Data(8, 34),
                new Data(9, 55));

        return Flux.fromIterable(data)
                .delayUntil(d -> Mono.delay(Duration.ofSeconds(d.getDelay())));
    }

    /* Keep in mind that empty constructor, getters/setters
       are purely only for JSON serialization */
    private final class Data {
        private int id;
        private int delay;

        public Data() {}

        public Data(int id, int delay) {
            this.id = id;
            this.delay = delay;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public int getDelay() {
            return delay;
        }

        public void setDelay(int delay) {
            this.delay = delay;
        }
    }
}

Задержки сбора потока на каждом элементе до тех пор, пока основной моно-поток не завершится.Каждый из Monos задерживается с соответствующим временем в секундах на основе данных входного потока.

В результате мы получили потоковый HTTP-ответ с желаемыми пользовательскими задержками:

$ curl http://localhost:8080/time
{"id":1,"delay":1} # after 1 sec
{"id":2,"delay":2} # after 2 sec
{"id":3,"delay":3} # and so on
{"id":4,"delay":5}
{"id":5,"delay":8}
{"id":6,"delay":13}
{"id":7,"delay":21}
{"id":8,"delay":34}
{"id":9,"delay":55}
...