Я не думаю, что API дает гарантии, которые вы ищете здесь.
stream.pipeline
вызывает свой обратный вызов после завершения записи всех данных. Поскольку данные были записаны в новый поток Transform (ваш Passthrough), и этому потоку еще некуда помещать данные, он просто сохраняется во внутреннем буфере потока. Этого достаточно для конвейера.
Если вы хотите прочитать достаточно большой файл , заполняя буфер потока преобразования, то обратное давление потока может автоматически вызвать pause()
для читаемого файла, который читает файл. Как только поток Transform истощится, он автоматически unpause()
станет доступным для чтения, поэтому поток данных возобновится.
Я думаю, что ваш пример делает два неверных предположения:
(1) что вы можете приостановить трансформационный поток. Согласно stream docs , приостановка любого потока, который передается в пункт назначения , неэффективна, потому что он немедленно отключит себя, как только переданный по трубопроводу пункт назначения запросит дополнительные данные. Кроме того, приостановленный поток преобразования все еще считывает данные! Приостановленный поток просто не записывает данные.
(2) что пауза дальше по конвейеру каким-то образом распространяется до передней части конвейера и вызывает прекращение потока данных. Это только правда , если вызвано противодавлением, то есть вам нужно будет запустить обнаружение узла полного внутреннего буфера.
При работе с трубами лучше предположить, что у вас есть ручное управление двумя самыми дальними концами, но не обязательно какой-либо из частей в середине. (Вы можете вручную pipe()
и unpipe()
подключить и отключить промежуточные потоки, но вы не можете приостановить их.)