Я видел в Java концепцию реактивных потоков, которая стремится стандартизировать концепцию реактивных javaRX и Spring.Все хорошо, за исключением того факта, что для некоторого преобразования потоков вам необходимо внедрить один или несколько процессоров.Мой вопрос касается необходимости интерфейса процессора Процессор, который расширяет Subscriber, Publisher
Кажется, вы прошли весело, когда выполняете преобразование и соединяете его как с производителем, так и с подписчиком, и все!Но возникают некоторые вопросы:
- Как вы справляетесь с обратным давлением со стороны клиента / подписчика.Если клиент запрашивает 10 элементов, вы не знаете в Processor, сколько элементов вы должны запросить в дальнейшем для Producer.Я видел примеры, запрашивающие элементы 1 или Int.MAX
- В чем суть?Поскольку из того, что я наблюдал, это просто забавно, когда вы выполняете преобразование, забава передается конструктору и вызывается позже, когда элемент проходит через него (и это все).Так не могли ли мы достичь этого непосредственно у продюсера или подписчика?(я знаю, что вы хотите разделить проблемы, но вы можете устранить проблему 1)
Вы можете увидеть базовый пример здесь: https://www.concretepage.com/java/java-9/java-reactive-streams.В методе процессора onNext вы можете видеть, что процессор запрашивает 1 элемент, и это беспокоит меня: как насчет противодавления со стороны абонента?Что, если подписчик запросил 100 элементов один раз в партии?Разве процессор не должен фокусироваться только на стороне обработки и не должен запрашивать элементы?
@Override
public void onNext(Article item) {
subscription.request(1);
submit(function.apply(item));
}
Спасибо!