Как остановить поток, который блокирует именованный канал в Python? - PullRequest
0 голосов
/ 10 декабря 2018

У меня есть класс, который подклассы threading.Thread.Это единственная ответственность - помещать сообщения, прочитанные из именованного канала UNIX, в объект queue.Queue (чтобы другие потоки могли обрабатывать эти значения позже).

Пример кода:

class PipeReaderThread(Thread):
    def __init__(self, results_queue, pipe_path):
        Thread.__init__(self)
        self._stop_event = Event()
        self._results_queue = results_queue
        self._pipe_path = pipe_path

    def run(self):
        while not self._stop_event.is_set():
            with open(self._pipe_path, 'r') as pipe:
                message = pipe.read()
            self._results_queue.put(message, block=True)

    def stop(self):
        self._stop_event.set()

Как выможно видеть, что я хотел использовать объект threading.Event для остановки цикла, но поскольку вызовы open() или read() в именованном канале будут блокироваться (пока кто-то не откроет канал для записи / записи в него, а затем закроет его),поток никогда не может остановиться.

Я не хотел использовать неблокирующий режим для именованного канала, поскольку блокировка на самом деле , что я хочу в том смысле, в котором я хочуподождите, пока кто-нибудь откроет и напишет в канал.

С сокетами я бы попробовал что-то вроде установки флага времени ожидания на сокете, но я не смог найти никакого способа сделать это для именованных каналов.Я также подумал о том, чтобы просто хладнокровно убить нить, не дав ей возможности изящно остановиться, но на самом деле это не то, чем я должен заниматься, и я даже не знаю, предоставляет ли Python какой-либо способ сделать это..

Как правильно остановить этот поток, чтобы потом можно было вызывать join() для него?

1 Ответ

0 голосов
/ 29 декабря 2018

Классический способ сделать это - иметь безымянный канал, который сигнализирует о закрытии, и использовать select, чтобы узнать, какой из них будет использоваться.

select будет блокироваться, пока один из дескрипторов не будет готов для чтенияи затем вы можете использовать os.read, который не будет блокировать в этом случае.

Код для демонстрации (не обрабатывает ошибки, может пропустить дескрипторы):

class PipeReaderThread(Thread):
    def __init__(self, results_queue, pipe_path):
        Thread.__init__(self)
        self._stop_pipe_r, self._stop_pipe_w = os.pipe()
        self._results_queue = results_queue
        self._pipe = os.open(pipe_path, os.O_RDONLY) # use file descriptors directly to read file in parts
        self._buffer = b''

    def run(self):
        while True:
            result = select.select([self._stop_pipe_r, self._pipe], [], [])
            if self._stop_pipe_r in result[0]:
                os.close(self._stop_pipe_r)
                os.close(self._stop_pipe_w)
                os.close(self._pipe)
                return
            self._buffer += os.read(self._pipe, 4096) # select above guarantees read is noblocking
            self._extract_messages_from_buffer() # left as an exercise

    def stop(self):
        os.write(self._stop_pipe_w, b'c')
...