Реализация обратного давления в пользовательском конвейере RxJava - PullRequest
0 голосов
/ 07 мая 2019

Я пытаюсь написать сложную систему испускания сущностей с XML-парсером под ней и не могу правильно понять обратное давление.В настоящее время у меня есть следующий интерфейс (который реализует мой анализатор):

public interface Reader<T> {
    void read(T chunk, Recorder recorder) throws FatalException;

    default Flowable<Object> toFlowable(Flowable<T> input) {
        return input.flatMap(chunk -> {
            Recorder recorder = new Accumulator();
            read(chunk, recorder);
            var combination = Stream.concat(recorder.values.stream(), recorder.errors.stream().map(ErrorEvent::new))
                    .collect(Collectors.toList());
            return Flowable.fromIterable(combination);
        }, false, 128, 1); // note the buffer size of one
    }
}

Где chunk - это один тег XML в реализации, поэтому мой анализатор может записывать 0..1 элементов за один вызов.Я хочу полностью приостановить чтение, пока подписчик не запросит новый элемент.

Я проверяю его следующим образом:

var reader = new XmlReader(handlers);
// collection of XML tags in form of
// <root><child>content</child><child>content</child>...
var source = Flowable.fromIterable(standardDocument());
var stream = reader.toFlowable(source);
stream.subscribe(new DisposableSubscriber<>() {
    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Object o) {}

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

verify(root, times(1)).getChildHandler("child"); // means only one <child> tag was encountered

Однако проверка завершается неудачно со следующей ошибкой подтверждения:

entityHandler.getChildHandler("child");
Wanted 1 time:
-> at redacted.reader.xml.XmlReaderTest.respectsBackpressure(XmlReaderTest.java:156)
But was 18 times. Undesired invocation:
-> at redacted.reader.xml.XmlReader.lambda$handleStart$1(XmlReader.java:52)

«18 раз» - это максимальное число, независимо от того, сколько раз мой поддельный документ будет повторять тег <child>, поэтому создается впечатление, что есть буфер.Однако, как вы можете видеть в приведенном выше коде, я устанавливаю размер буфера равным единице.Что я пропустил и как заставить мой анализатор приостановить работу, пока подписчик не запросит новый элемент?

...