Осторожно прервите "для строки в именованной трубе" -loop - PullRequest
0 голосов
/ 20 января 2019

Я читаю из именованного канала блокирующим образом.Я хочу, чтобы мой скрипт на python реагировал на сигналы SIGTERM.

Это то, что я получил до сих пор:

#!/usr/bin/python3

import signal

def handler_stop_signals(signum, frame):
    global fifo
    fifo.close()
    exit

signal.signal(signal.SIGTERM, handler_stop_signals)

fifo = open("someNamedPipe", "r")
while True:
    for line in fifo:
        doSomething
fifo.close()
exit

Когда скрипт получает сигнал SIGTERM, он закрывает канал, как и ожидалосьно вызывает RuntimeError.

RuntimeError: reentrant call inside <_io.BufferedReader name='someNamedPipe'>

Есть ли другой способ выйти из цикла foor и аккуратно закрыть fifo?

1 Ответ

0 голосов
/ 21 января 2019

TL; DR Вам нужно неблокирующее чтение , чтобы иметь возможность контролировать завершение; asyncio с использованием aiofiles , вероятно, является наиболее элегантным решением, но у всех есть свои особенности.

Sample Producer

Я собираюсь начать скак можно было бы написать хорошо себя ведущий производитель данных в именованный канал, потому что это более простой способ ввести некоторые API.

import os
from threading import Event

class Producer:
    def __init__(self, path):
        self.path = path
        self.event = Event()

    def start(self):
        os.mkfifo(self.path)
        try:
            print('Waiting for a listener...')
            with open(self.path, 'w') as fifo:
                fifo.write('Starting the convoluted clock...\n')
                fifo.flush()

                while not self.event.wait(timeout=1):
                    print('Writing a line...')
                    fifo.write(str(datetime.now()) + '\n')
                    fifo.flush()

                fifo.write('The convoluted clock has finished.\n')
                fifo.flush()
                print('Finished.')
        finally:
            os.unlink(self.path)

    def stop(self, *args, **kwargs):
        self.event.set()

producer = Producer('/tmp/someNamedPipe')
signal.signal(signal.SIGINT, producer.stop)
signal.signal(signal.SIGTERM, producer.stop)
producer.start()

Это записывает текущую дату вименованный канал в виде строки один раз в секунду.SIGINT и SIGTERM будут корректно отключать канал, записывая The convoluted clock has finished. в качестве последней строки в канал перед закрытием.Он использует threading.Event для связи между методом stop (который будет выполняться в фоновом потоке) и start (который ждет не более одной секунды, прежде чем перейти к следующей итерации цикла).self.event.wait(timeout=1) немедленно возвращает True, если сигнал установлен, или False после ожидания не более одной секунды без устанавливаемого сигнала.

Sample (Buggy) Consumer

Было бы заманчиво использовать подобную технику для написания потребителя:

import signal, os
from threading import Event

class BuggyConsumer:
    def __init__(self, path):
        self.path = path
        self.event = Event()

    def start(self):
        with open(self.path, 'r') as fifo:
            # we'll be a bit more aggressive on checking for termination
            # because we could have new data for us at any moment!
            while not self.event.wait(0.1):
                print('Got from the producer:', fifo.readline())

            print('The consumer was nicely stopped.')
            # technically the pipe gets closed AFTER this print statement
            # because we're using a with block
        finally:
            fifo.close()

    def stop(self, *args, **kwargs):
        self.event.set()

consumer = BuggyConsumer('/tmp/someNamedPipe')
signal.signal(signal.SIGINT, consumer.stop)
signal.signal(signal.SIGTERM, consumer.stop)
consumer.start()

К сожалению, на практике это не сработает, потому что open() открывает файлы в режиме блокировки ,Это означает, что read() вызовы блокируют вызывающий поток, что по существу предотвращает «хорошие» прерывания, если вы не проверяете промежуточные вызовы read.Конкретно, если производитель прекратит производство, но оставит трубу открытой, потребитель будет вечно сидеть на fifo.readline() и никогда не сможет проверить сигнал на «хорошее» завершение.

Образец (меньше ошибок) Потребитель

В этом примере устраняется проблема, связанная с неправильным поведением производителя, который ловит потребителя в блокирующем вызове чтения, но он значительно сложнее и вынуждает вас использовать API-интерфейсы более низкого уровня, которые не так дружелюбны:

import signal, os
from threading import Event

class ComplicatedConsumer:
    def __init__(self, path):
        self.path = path
        self.event = Event()

    def start(self):
        # Open a file descriptor in a non-blocking way.
        fifo = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
        try:
            while not self.event.wait(0.1):
                try:
                    # This is FAR from a comprehensive implementation.
                    # We're making some pretty yucky assumptions.
                    line = os.read(fifo, 1000).decode('utf8')
                    if line:
                        print('Got from the producer:', line)
                    else:
                        print('EOF from the producer.')
                        break
                except BlockingIOError:
                    # the call to os.read would have blocked (meaning we're
                    # caught up)
                    pass

            print('The consumer was nicely stopped.')

        finally:
            os.close(fifo)

    def stop(self, *args, **kwargs):
        self.event.set()

A правильная реализация была бы FAR более сложной, потому что этот код наивно предполагает, что:

  • каждый read() вызов из канала представляет собой одно полное сообщение;это худшее предположение.Вы можете ускорить работу производителя и увидеть, что этот менее ошибочный потребитель начинает читать несколько «строк» ​​как одну строку.
  • строка никогда не занимает более 1000 байтов;более продвинутая реализация должна была бы буферизовать «частичные» сообщения, искать новые строки и делить их соответственно

Во всех случаях, кроме самых простых и медленных вариантов использования (как, скажем, один разсекунды (тикают часы), эта реализация потребовала бы ТОНА работы, чтобы быть практически полезной.

Sample Consumer (asyncio)

Сложность при написании этого правильно состоит в том, что естьнесколько непредсказуемых источников событий (сигналы, входящие данные из канала).asyncio позволяет вам выражать свой код как сопрограммы, и они могут быть приостановлены и возобновлены, когда Python захочет, но с указанием вами правил.

import asyncio
import aiofiles

class AsyncConsumer:
    def __init__(self, path):
        loop = asyncio.get_event_loop()
        self.path = path
        self.fifo_closed = loop.create_future()
        self.fifo = None

    async def start(self):
        import aiofiles
        self.fifo = await aiofiles.open(self.path, 'r')
        done, pending = await asyncio.wait(
            [self._read_lines(), self.fifo_closed],
            return_when=asyncio.FIRST_COMPLETED)
        print('The consumer is going to be nicely stopped...')
        await self.fifo.close()
        print('The consumer was nicely stopped.')

    async def _read_lines(self):
        try:
            async for line in self.fifo:
                print('Got from the producer:', line)
            print('EOF from the producer.')
        except ValueError:
            # aiofiles raises a `ValueError` when the underlying file is closed
            # from underneath it
            pass

    def stop(self, *args, **kwargs):
        if self.fifo is not None:
            print('we got the message')
            self.fifo_closed.set_result(None)

loop = asyncio.get_event_loop()
consumer = AsyncConsumer('/tmp/someNamedPipe')
loop.add_signal_handler(signal.SIGINT, consumer.stop)
loop.add_signal_handler(signal.SIGTERM, consumer.stop)
loop.run_until_complete(consumer.start())

Метод async start() запускает два потоков потоков дел работы: один, который читает строки одна за другой по мере их поступления, а другой, по существу, зависает до получения сигнала.Это происходит, когда ЛИБО из этих двух вещей завершается.

К сожалению, я заметил, что в конечном итоге aiofiles опирается на реализацию блокировки под капотом, потому что метод await self.fifo.close() все еще зависает, если выполняется read().Но, по крайней мере, есть место для размещения вашего кода.

Завершение его

В конечном счете, не существует отличного готового решения для решения вашей проблемы, но, надеюсь,один из этих вариантов может помочь вам решить вашу проблему.

...