Я работаю над библиотекой, которая возвращает пользователям Flux<Message>
. После того, как они выполнили некоторые операции над этим Message
, я должен выполнить асинхронную операцию, чтобы подтвердить это на стороне сервера. Возможно ли чередовать код пользователя и код библиотеки?
В моей текущей реализации я создал FluxProcessor<Message, Message>
, но когда я вызываю service.complete(message).block()
, он выдает IllegalStateException
"block () / blockFirst ( ) / blockLast () - это блокировка, которая не поддерживается в потоке single-1 ".
Мы должны запланировать наши операции на стороне сервера для одного планировщика, поэтому я понимаю, почему он выбрасывает.
Есть ли операторы, которые я мог бы использовать? Любые предложения будут полезны!
// This is the public facing API that the users would interact with.
public class Receiver {
public Flux<Message> receive() {
return streamMessagesFromService().subscribeWith(new MessageProcessor());
}
}
class MessageProcessor extends FluxProcessor<Message, Message> implements Subscription {
// This is the server-side complete operation.
private final Function<Message, Mono<Void>> completeFunction;
private volatile CoreSubscriber<? super Message> downstream;
@Override
public void onNext(Message message) {
// Let the user do whatever with that message downstream.
downstream.onNext(message);
// This is where it throws the IllegalStateException.
completeFunction.apply(message).block();
}
}