Различные обработки потоков для Flux.fromIterable в качестве результата метода контроллера - PullRequest
0 голосов
/ 10 мая 2018

Моя среда:

Spring Boot 2.0.1.RELEASE с webflux, работающим на контейнере сервлетов через Tomcat.Я определил один контроллер с кодом ниже.

package com.demo.controller;

import com.demo.service.pojo.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

import java.util.*;
import java.util.function.*;

@RestController
@CrossOrigin
@RequestMapping(value = "/reactive")
public class ReactiveInfoController {
    private final Logger logger = LoggerFactory.getLogger(ReactiveInfoController.class);

    private final List<User> users;

    public ReactiveInfoController() {
        final User user1 = new User();
        user1.setName("AN");
        user1.setSurname("AS");

        final User user2 = new User();
        user2.setName("BN");
        user2.setSurname("BS");

        final User user3 = new User();
        user3.setName("CN");
        user3.setSurname("CS");

        this.users = new ArrayList<>();
        users.add(user1);
        users.add(user2);
        users.add(user3);
    }

    @GetMapping("/user/list/a")
    public Flux<User> listUsersA() {
        final Iterable<User> userIterable = new Iterable<User>() {
            @Override
            public Iterator<User> iterator() {
                final Iterator<User> iterator = users.iterator();
                return new Iterator<User>() {
                    @Override
                    public boolean hasNext() {
                        return iterator.hasNext();
                    }

                    @Override
                    public User next() {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            logger.error("Error while sleeping.", e);
                        }
                        return iterator.next();
                    }
                };
            }

            @Override
            public void forEach(final Consumer<? super User> action) {
            }

            @Override
            public Spliterator<User> spliterator() {
                return null;
            }
        };

        return Flux.fromIterable(userIterable).doOnNext(user -> {
            logger.info("[ZZZ] received: " + user.getName() + " " + user.getSurname());
        });
    }

    @GetMapping("/user/list/b")
    public Flux<User> listUsersB() {
        final Iterable<User> userIterable = new Iterable<User>() {
            @Override
            public Iterator<User> iterator() {
                final Iterator<User> iterator = users.iterator();
                return new Iterator<User>() {
                    @Override
                    public boolean hasNext() {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            logger.error("Error while sleeping.", e);
                        }
                        return iterator.hasNext();
                    }

                    @Override
                    public User next() {
                        return iterator.next();
                    }
                };
            }

            @Override
            public void forEach(final Consumer<? super User> action) {
            }

            @Override
            public Spliterator<User> spliterator() {
                return null;
            }
        };

        return Flux.fromIterable(userIterable).doOnNext(user -> {
            logger.info("[ZZZ] received: " + user.getName() + " " + user.getSurname());
        });
    }
}

Как видите, единственная разница между конечными точками - Thread.sleep() call.В A система ожидает получения значения, а в B система ожидает получения информации о том, будет ли доступно какое-либо другое значение.Этот метод показывает два возможных сценария:

A.Система знает, сколько предметов будет отправлено, или знает, что еще будет больше.Поэтому итератор hasNext() немедленно возвращает true.Затем система блокируется на одну секунду по методу next(), ожидая отправки следующего элемента.

B.Система не знает, сколько элементов будет отправлено или еще больше.Он будет знать, что у него будет больше данных, когда они придут, или информация о том, что больше элементов не будет отправлено, была доставлена.Так что блокируется по методу hasNext().Метод next() возвращает значение сразу же, как оно было получено во время вызова метода hasNext().

Проблема

  1. Вызов конечной точки /demo/reactive/user/list/a

команда:

curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a

результат:

data:{"name":"AN","surname":"AS"}
data:{"name":"BN","surname":"BS"}
data:{"name":"CN","surname":"CS"}

logs:

2018-05-10 12:17:00,965 INFO  [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-5) [ZZZ] received: AN AS
2018-05-10 12:17:01,967 INFO  [com.demo.controller.ReactiveInfoController] (MvcAsync8) [ZZZ] received: BN BS
2018-05-10 12:17:02,971 INFO  [com.demo.controller.ReactiveInfoController] (MvcAsync9) [ZZZ] received: CN CS

Как и ожидалось, после показа первой строки 1 с, после показа второй строки 1 с после и после следующей 1с третьей строки.Из журналов видно, что пользователи были получены один за другим с интервалом 1 с.Более того, они обрабатывались разными потоками.

Вызывающая конечная точка /demo/reactive/user/list/b

Команда:

curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/b

Результат:

data:{"name":"AN","surname":"AS"}
data:{"name":"BN","surname":"BS"}
data:{"name":"CN","surname":"CS"}

Журналы:

2018-05-10 12:20:12,321 INFO  [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: AN AS
2018-05-10 12:20:13,321 INFO  [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: BN BS
2018-05-10 12:20:14,322 INFO  [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: CN CS

Как и следовало ожидать, через 3 секунды все линии появляются одновременно.Нет потоковой передачи.Но из журналов видно, что пользователи были получены один за другим с интервалом 1 с.А также они были обработаны только одним потоком.

Вопрос

Это нормальное поведение или это ошибка?Я не знаю, пропускаю ли я некоторые необходимые знания или эти два запроса должны вести себя одинаково (оба должны передавать результаты в потоковом режиме, когда они готовы).

EDIT 1

Я постараюсь лучше показать, что происходит с временной шкалой.

A.

+ command execution start: curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a
| [1s]
+ output shows: data:{"name":"AN","surname":"AS"}
| [1s]
+ output shows: data:{"name":"BN","surname":"BS"}
| [1s]
+ output shows: data:{"name":"CN","surname":"CS"}
+ command execution stop

B.

+ command execution start: curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a
| [1s]
| [1s]
| [1s]
+ output shows: data:{"name":"AN","surname":"AS"}
+ output shows: data:{"name":"BN","surname":"BS"}
+ output shows: data:{"name":"CN","surname":"CS"}
+ command execution stop

Надеюсь, это показывает разницу.

...