Я пытаюсь написать сложную систему испускания сущностей с XML-парсером под ней и не могу правильно понять обратное давление.В настоящее время у меня есть следующий интерфейс (который реализует мой анализатор):
public interface Reader<T> {
void read(T chunk, Recorder recorder) throws FatalException;
default Flowable<Object> toFlowable(Flowable<T> input) {
return input.flatMap(chunk -> {
Recorder recorder = new Accumulator();
read(chunk, recorder);
var combination = Stream.concat(recorder.values.stream(), recorder.errors.stream().map(ErrorEvent::new))
.collect(Collectors.toList());
return Flowable.fromIterable(combination);
}, false, 128, 1); // note the buffer size of one
}
}
Где chunk - это один тег XML в реализации, поэтому мой анализатор может записывать 0..1 элементов за один вызов.Я хочу полностью приостановить чтение, пока подписчик не запросит новый элемент.
Я проверяю его следующим образом:
var reader = new XmlReader(handlers);
// collection of XML tags in form of
// <root><child>content</child><child>content</child>...
var source = Flowable.fromIterable(standardDocument());
var stream = reader.toFlowable(source);
stream.subscribe(new DisposableSubscriber<>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Object o) {}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
verify(root, times(1)).getChildHandler("child"); // means only one <child> tag was encountered
Однако проверка завершается неудачно со следующей ошибкой подтверждения:
entityHandler.getChildHandler("child");
Wanted 1 time:
-> at redacted.reader.xml.XmlReaderTest.respectsBackpressure(XmlReaderTest.java:156)
But was 18 times. Undesired invocation:
-> at redacted.reader.xml.XmlReader.lambda$handleStart$1(XmlReader.java:52)
«18 раз» - это максимальное число, независимо от того, сколько раз мой поддельный документ будет повторять тег <child>
, поэтому создается впечатление, что есть буфер.Однако, как вы можете видеть в приведенном выше коде, я устанавливаю размер буфера равным единице.Что я пропустил и как заставить мой анализатор приостановить работу, пока подписчик не запросит новый элемент?