Разница между бесконечным потоком Java и потоком Reactor - PullRequest
0 голосов
/ 15 октября 2018

Я пытаюсь выяснить концептуальные различия между бесконечным Потоком и бесконечным Потоком соответственно (если они есть).

В этом отношении я придумал следующие примеры для бесконечного потока/ Flux

@Test
public void infinteStream() {

  //Prints infinite number of integers
  Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);

  infiniteStream.forEach(System.out::println);
}

@Test
public void infiniteFlux()  {

   //Prints infinite number of date strings (every second)
   Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
            .map(t -> LocalDateTime.now());

    localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}

По поводу этих примеров у меня возникает вопрос: есть ли аналог для infinteStream () с Flux (и для infinteFlux () с Stream соответственно)?И вообще, есть ли различия между бесконечным потоком и потоком?

Заранее спасибо, Феликс

Ответы [ 2 ]

0 голосов
/ 16 октября 2018

Для справки, тем временем я придумал Stream-Solution для infiniteFlux ():

@Test 
public void infiniteFluxWithStream()  {

    Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1).peek(x->{
    LocalDateTime t = LocalDateTime.now();
    t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
    System.out.println(t);
    });

    infiniteStream.forEach(x->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }); 

}

Это действительно безобразно.Тем не менее, это показывает, что в (очень) принципе, можно переписать простые Примеры Flux в терминах Streams.

0 голосов
/ 15 октября 2018

Stream и Flux совершенно разные:

  • Stream - одноразовое использование, в то время как вы можете подписаться несколько раз на Flux
  • Streamосновано на вытягивании (использование одного элемента требует следующего), в то время как Flux имеет гибридную модель выталкивания / извлечения, в которой издатель может выдвигать элементы, но все же должен учитывать обратное давление, сигнализируемое потребителем
  • Streamявляются синхронными последовательностями, а Flux может представлять асинхронные последовательности

Например, вы генерируете бесконечную последовательность значений с помощью Stream, они создаются и потребляются максимально быстро.В вашем примере Flux вы создаете значения с фиксированным интервалом (что-то, что я не уверен, что вы можете сделать с Stream).С Flux вы также можете Flux.generate последовательности без интервалов, как ваш Stream пример.

В общем, вы можете рассматривать Flux как сочетание Stream + CompletableFuture,с:

  • множеством мощных операторов
  • поддержка противодавления
  • контроль над поведением издателя и подписчика
  • контроль над понятием времени (буферизацияокна значений, добавление таймаутов и откатов и т. д.)
  • что-то специально для асинхронных последовательностей, извлекаемых по сети (из базы данных или удаленного веб-API)
...