В чем разница между созданием Flux напрямую путем вызова Flux.push
и использованием приемника в выражении push
lambada по сравнению с использованием приемника, предоставленного DirectProcessor
?
В минимальном примере где Flux просто испускает пару событий, я мог бы сделать
Flux.<String>push(emitter -> {
emitter.next("One");
emitter.next("Two");
emitter.complete();
});
против. используя DirectProcessor
var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Просто чтобы уточнить: я знаю, что я мог бы использовать Flux.just
здесь, но мой вариант использования на самом деле строит мост между Spring * @EventListener
и Spring WebFlux, где я хотите создать поток для каждого входящего запроса SSE для определенного ресурса c, а затем опубликовать sh события для этого потока.
Кто-нибудь может мне сказать, если оба подхода будут действительны? Конечно, должна быть какая-то разница. В частности, в разделе Справочное руководство по реактору на DirectProcessor
указано:
С другой стороны, он имеет ограничение не обрабатывать противодавление. Как следствие, DirectProcessor сигнализирует об исключении IllegalStateException своим подписчикам, если вы пропустили через него sh N элементов, но хотя бы один из его подписчиков запросил меньше N.
Что это значит?
[EDIT:] В более ранней версии вопроса я использовал Flux.generate()
вместо Flux.push()
, что, очевидно, неверно, поскольку generate может создать не более одного события.
[РЕДАКТИРОВАТЬ 2:] @ 123 попросил у меня полный пример того, чего я пытаюсь достичь. Имейте в виду, это достаточный объем кода для SO вопроса:
Полный пример того, что я на самом деле пытаюсь сделать
Я хотел бы построить мост между (не -reactive) прослушиватель событий домена Spring и реактивный Flux, который я затем могу использовать в конечной точке WebFlux для публикации sh SSE. В следующих фрагментах кода для краткости используются аннотации Lombok .
Предположим, что в конечном итоге я хочу опубликовать sh состояние пользователя в процессе адаптации как SSE. Вот перечисление:
public enum ProcessState {
CREATED(false),
VERIFIED(false),
AUTHORIZATION_PENDING(false),
AUTHORIZED(false),
ACTIVE(true);
@Getter
private final boolean terminalState;
ProcessState(boolean terminalState) {
this.terminalState = terminalState;
}
}
Неактивные бизнес-логи c будут публиковать sh StateChangedEvents
всякий раз, когда изменяется состояние любого пользователя:
@Data
@RequiredArgsConstructor
public class StateChangedEvent {
private final UUID userId;
private final ProcessState newState;
}
И это это откуда мой оригинальный вопрос. Как бы я построить мост, который переводит события этого домена в поток Flux? Мои требования:
- Текущее состояние процесса должно быть отправлено, как только новый клиент зарегистрируется
- Поток потока должен завершаться всякий раз, когда достигается состояние "терминала" на входе.
Это то, что я до сих пор получил:
@Component
@RequiredArgsConstructor
class EventBridge {
@RequiredArgsConstructor(access = PRIVATE)
private static class Subscriber {
private final UUID userId;
private final FluxSink<ProcessState> sink;
private boolean eventEmitted;
}
private final UserRepository repository;
private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();
@EventListener
void stateChanged(StateChangedEvent event) {
notifySubscribers(event);
}
Flux<ProcessState> register(UUID userId) {
return Flux.push(emitter -> addSubscriber(userId, emitter));
}
private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
var subscriptionId = randomUUID();
var subscriber = new Subscriber(userId, sink);
subscribers.put(subscriptionId, subscriber);
sink
.onRequest(n -> poll(subscriber))
.onDispose(() -> removeSubscriber(subscriptionId));
return subscriber;
}
private void poll(Subscriber subscriber) {
emit(subscriber, loadCurrentState(subscriber), true);
}
private ProcessState loadCurrentState(Subscriber subscriber) {
return repository.findById(subscriber.userId).getProcessState();
}
private void removeSubscriber(UUID subscriptionId) {
subscribers.remove(subscriptionId);
}
private void notifySubscribers(StateChangedEvent event) {
subscribers.values().stream()
.filter(subscriber -> subscriber.userId.equals(event.getUserId()))
.forEach(subscriber -> emit(subscriber, event.getNewState(), false));
}
private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
synchronized (subscriber) {
if (onlyIfFirst && subscriber.eventEmitted) {
return;
}
subscriber.sink.next(processState);
if (processState.isTerminalState()) {
subscriber.sink.complete();
}
subscriber.eventEmitted = true;
}
}
}
И, наконец, контроллер, где используется мост:
@RestController
@RequiredArgsConstructor
class UserController {
private final EventBridge eventBridge;
@GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
}
}
Есть пара проблем в моем коде моста, я не могу обернуть голову:
Мне действительно нужно синхронизировать мой экземпляр Subscriber
, чтобы избежать записи устаревших событий из poll
в начальном состоянии? Если это не так, случается так, что событие StateChange прибывает и публикуется за до , текущее состояние читается из хранилища, то есть , а затем выводится из строя. Конечно, должен быть более элегантный способ Flux-i sh для обработки этого без синхронизированного ключевого слова.
Мы уже исключили Flux.generate
, похоже, он работает с Flux.push
, Flux.create
будет генерировать намного больше событий SSE? Почему? Боюсь, я не понимаю различий между ними.
Вместо того, чтобы использовать методы stati c на Flux
, если я использую DirectProcessor
или любой другой процессор Вот? Я новичок во всем реактивном стеке, и документация по Spring Reactor слишком расплывчата для меня. Опять же: в чем различия? Как насчет этого комментария о противодействии давлению, о котором я упоминал выше?