Моя среда:
Spring Boot 2.0.1.RELEASE
с webflux, работающим на контейнере сервлетов через Tomcat.Я определил один контроллер с кодом ниже.
package com.demo.controller;
import com.demo.service.pojo.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.*;
import java.util.function.*;
@RestController
@CrossOrigin
@RequestMapping(value = "/reactive")
public class ReactiveInfoController {
private final Logger logger = LoggerFactory.getLogger(ReactiveInfoController.class);
private final List<User> users;
public ReactiveInfoController() {
final User user1 = new User();
user1.setName("AN");
user1.setSurname("AS");
final User user2 = new User();
user2.setName("BN");
user2.setSurname("BS");
final User user3 = new User();
user3.setName("CN");
user3.setSurname("CS");
this.users = new ArrayList<>();
users.add(user1);
users.add(user2);
users.add(user3);
}
@GetMapping("/user/list/a")
public Flux<User> listUsersA() {
final Iterable<User> userIterable = new Iterable<User>() {
@Override
public Iterator<User> iterator() {
final Iterator<User> iterator = users.iterator();
return new Iterator<User>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public User next() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
logger.error("Error while sleeping.", e);
}
return iterator.next();
}
};
}
@Override
public void forEach(final Consumer<? super User> action) {
}
@Override
public Spliterator<User> spliterator() {
return null;
}
};
return Flux.fromIterable(userIterable).doOnNext(user -> {
logger.info("[ZZZ] received: " + user.getName() + " " + user.getSurname());
});
}
@GetMapping("/user/list/b")
public Flux<User> listUsersB() {
final Iterable<User> userIterable = new Iterable<User>() {
@Override
public Iterator<User> iterator() {
final Iterator<User> iterator = users.iterator();
return new Iterator<User>() {
@Override
public boolean hasNext() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
logger.error("Error while sleeping.", e);
}
return iterator.hasNext();
}
@Override
public User next() {
return iterator.next();
}
};
}
@Override
public void forEach(final Consumer<? super User> action) {
}
@Override
public Spliterator<User> spliterator() {
return null;
}
};
return Flux.fromIterable(userIterable).doOnNext(user -> {
logger.info("[ZZZ] received: " + user.getName() + " " + user.getSurname());
});
}
}
Как видите, единственная разница между конечными точками - Thread.sleep()
call.В A
система ожидает получения значения, а в B
система ожидает получения информации о том, будет ли доступно какое-либо другое значение.Этот метод показывает два возможных сценария:
A.Система знает, сколько предметов будет отправлено, или знает, что еще будет больше.Поэтому итератор hasNext()
немедленно возвращает true.Затем система блокируется на одну секунду по методу next()
, ожидая отправки следующего элемента.
B.Система не знает, сколько элементов будет отправлено или еще больше.Он будет знать, что у него будет больше данных, когда они придут, или информация о том, что больше элементов не будет отправлено, была доставлена.Так что блокируется по методу hasNext()
.Метод next()
возвращает значение сразу же, как оно было получено во время вызова метода hasNext()
.
Проблема
- Вызов конечной точки
/demo/reactive/user/list/a
команда:
curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a
результат:
data:{"name":"AN","surname":"AS"}
data:{"name":"BN","surname":"BS"}
data:{"name":"CN","surname":"CS"}
logs:
2018-05-10 12:17:00,965 INFO [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-5) [ZZZ] received: AN AS
2018-05-10 12:17:01,967 INFO [com.demo.controller.ReactiveInfoController] (MvcAsync8) [ZZZ] received: BN BS
2018-05-10 12:17:02,971 INFO [com.demo.controller.ReactiveInfoController] (MvcAsync9) [ZZZ] received: CN CS
Как и ожидалось, после показа первой строки 1 с, после показа второй строки 1 с после и после следующей 1с третьей строки.Из журналов видно, что пользователи были получены один за другим с интервалом 1 с.Более того, они обрабатывались разными потоками.
Вызывающая конечная точка
/demo/reactive/user/list/b
Команда:
curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/b
Результат:
data:{"name":"AN","surname":"AS"}
data:{"name":"BN","surname":"BS"}
data:{"name":"CN","surname":"CS"}
Журналы:
2018-05-10 12:20:12,321 INFO [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: AN AS
2018-05-10 12:20:13,321 INFO [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: BN BS
2018-05-10 12:20:14,322 INFO [com.demo.controller.ReactiveInfoController] (http-nio-20012-exec-6) [ZZZ] received: CN CS
Как и следовало ожидать, через 3 секунды все линии появляются одновременно.Нет потоковой передачи.Но из журналов видно, что пользователи были получены один за другим с интервалом 1 с.А также они были обработаны только одним потоком.
Вопрос
Это нормальное поведение или это ошибка?Я не знаю, пропускаю ли я некоторые необходимые знания или эти два запроса должны вести себя одинаково (оба должны передавать результаты в потоковом режиме, когда они готовы).
EDIT 1
Я постараюсь лучше показать, что происходит с временной шкалой.
A.
+ command execution start: curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a
| [1s]
+ output shows: data:{"name":"AN","surname":"AS"}
| [1s]
+ output shows: data:{"name":"BN","surname":"BS"}
| [1s]
+ output shows: data:{"name":"CN","surname":"CS"}
+ command execution stop
B.
+ command execution start: curl -H 'Accept: text/event-stream' http://localhost:8001/demo/reactive/user/list/a
| [1s]
| [1s]
| [1s]
+ output shows: data:{"name":"AN","surname":"AS"}
+ output shows: data:{"name":"BN","surname":"BS"}
+ output shows: data:{"name":"CN","surname":"CS"}
+ command execution stop
Надеюсь, это показывает разницу.