Хотя классы BufferedIOBase
реализации обертывают объект IOBase
, их интерфейс представляет собой поток ( все наследуется от IOBase
), поэтому обычное поведение объекта IOBase
чтобы закрыть себя, когда они выходят за рамки. BufferedIOBase
просто делегирует вызов close()
базовому потоку.
Вы не должны рассматривать BufferedReader
как обертку потока (хотя это так и реализовано), а как приведение типа существующего потока. Состояние двух потоков: полностью связаны друг с другом. Однако вы можете отменить привязку обернутого потока с помощью detach()
, но это сделает объект BufferedIOBase
бесполезным.
Кроме того, io.open
возвращает BufferedReader
уже, когда режим равен rb
, так что вы выполняете двойную буферизацию. Вы должны использовать io.FileIO
вместо.
У вас есть несколько вариантов:
Создайте новый поток и новый базовый дескриптор файла и передайте имена файлов вместо потоков. Это ваш самый простой и безопасный вариант.
Создание необработанных файловых дескрипторов и создание потоков из них по мере необходимости. Это требует некоторых
позаботьтесь о том, чтобы несколько потоков не использовали один и тот же дескриптор файла одновременно. Например:
fd = os.open('test.txt', os.O_RDONLY)
file1 = FileIO(fd, 'r', closefd=False)
file2 = FileIO(fd, 'r', closefd=False)
file1.read(100)
assert file1.tell() == 100
file2.read(100)
assert file1.tell() == 200
detach()
базовый поток до того, как ваш BufferedIOBase
объект закроет свой поток. (Не забудьте перемотать поток!)
def test(streams):
for stream in streams:
b=TestReader(stream)
do_something(b)
wrappedstream = b.detach()
assert wrappedstream is stream
Вы даже можете реализовать это в своем деструкторе:
class TestReader(BufferedReader):
def __del__(self):
self.detach()
# self.raw will not be closed,
# rather left in the state it was in at detachment
Или просто полностью отключите делегирование close()
, если считаете, что семантика неверна:
class TestReader(BufferedReader):
def close(self):
self.closed = True
У меня нет полной картины того, что вы делаете (возможно, вам нужен другой дизайн), но вот как я реализовал бы код, который я вижу:
from io import FileIO, BufferedReader
import io
import os
class TestReader(BufferedReader):
pass
def test(streams):
for stream in streams:
b = TestReader(stream)
def test_reset(streams):
"""Will try to leave stream state unchanged"""
for stream in streams:
pos = stream.tell()
b = TestReader(stream)
do_something(b)
b.detach()
stream.seek(pos)
filenames = ['test1.txt', 'test2.txt']
# option 1: just make new streams
streams = [FileIO(name, 'r') for name in filenames]
test(streams)
streams = [io.open(name, 'rb') for name in filenames]
#etc
# option 2: use file descriptors
fds = [os.open(name, os.O_RDONLY) for name in filenames]
#closefd = False means "do not close fd on __del__ or __exit__"
#this is only an option when you pass a fd instead of a file name
streams = [FileIO(fd, 'r', closefd=False) for fd in fds]
test(streams)
streams = []
for fd in fds:
os.lseek(fd, 0, os.SEEK_SET)
streams.append(io.open(fd, 'rb', closefd=False))
# you can also .seek(0) on the BufferedReader objects
# instead of os.lseek on the fds
# option 3: detach
streams = [FileIO(name, 'r') for name in filenames]
test_reset(streams)
# streams[*] should still be in the same state as when you passed it in