Чередование кода пользователя с кодом библиотеки в Project Reactor - PullRequest
0 голосов
/ 14 марта 2020

Я работаю над библиотекой, которая возвращает пользователям Flux<Message>. После того, как они выполнили некоторые операции над этим Message, я должен выполнить асинхронную операцию, чтобы подтвердить это на стороне сервера. Возможно ли чередовать код пользователя и код библиотеки?

В моей текущей реализации я создал FluxProcessor<Message, Message>, но когда я вызываю service.complete(message).block(), он выдает IllegalStateException "block () / blockFirst ( ) / blockLast () - это блокировка, которая не поддерживается в потоке single-1 ".

Мы должны запланировать наши операции на стороне сервера для одного планировщика, поэтому я понимаю, почему он выбрасывает.

Есть ли операторы, которые я мог бы использовать? Любые предложения будут полезны!

// This is the public facing API that the users would interact with.
public class Receiver {
    public Flux<Message> receive() {
        return streamMessagesFromService().subscribeWith(new MessageProcessor());
    }
}

class MessageProcessor extends FluxProcessor<Message, Message> implements Subscription {
    // This is the server-side complete operation.
    private final Function<Message, Mono<Void>> completeFunction;

    private volatile CoreSubscriber<? super Message> downstream;

    @Override
    public void onNext(Message message) {
        // Let the user do whatever with that message downstream.
        downstream.onNext(message);

        // This is where it throws the IllegalStateException.
        completeFunction.apply(message).block();
    }
}
...