TL; DR Вам нужно неблокирующее чтение , чтобы иметь возможность контролировать завершение; asyncio с использованием aiofiles , вероятно, является наиболее элегантным решением, но у всех есть свои особенности.
Sample Producer
Я собираюсь начать скак можно было бы написать хорошо себя ведущий производитель данных в именованный канал, потому что это более простой способ ввести некоторые API.
import os
from threading import Event
class Producer:
def __init__(self, path):
self.path = path
self.event = Event()
def start(self):
os.mkfifo(self.path)
try:
print('Waiting for a listener...')
with open(self.path, 'w') as fifo:
fifo.write('Starting the convoluted clock...\n')
fifo.flush()
while not self.event.wait(timeout=1):
print('Writing a line...')
fifo.write(str(datetime.now()) + '\n')
fifo.flush()
fifo.write('The convoluted clock has finished.\n')
fifo.flush()
print('Finished.')
finally:
os.unlink(self.path)
def stop(self, *args, **kwargs):
self.event.set()
producer = Producer('/tmp/someNamedPipe')
signal.signal(signal.SIGINT, producer.stop)
signal.signal(signal.SIGTERM, producer.stop)
producer.start()
Это записывает текущую дату вименованный канал в виде строки один раз в секунду.SIGINT
и SIGTERM
будут корректно отключать канал, записывая The convoluted clock has finished.
в качестве последней строки в канал перед закрытием.Он использует threading.Event
для связи между методом stop
(который будет выполняться в фоновом потоке) и start
(который ждет не более одной секунды, прежде чем перейти к следующей итерации цикла).self.event.wait(timeout=1)
немедленно возвращает True
, если сигнал установлен, или False
после ожидания не более одной секунды без устанавливаемого сигнала.
Sample (Buggy) Consumer
Было бы заманчиво использовать подобную технику для написания потребителя:
import signal, os
from threading import Event
class BuggyConsumer:
def __init__(self, path):
self.path = path
self.event = Event()
def start(self):
with open(self.path, 'r') as fifo:
# we'll be a bit more aggressive on checking for termination
# because we could have new data for us at any moment!
while not self.event.wait(0.1):
print('Got from the producer:', fifo.readline())
print('The consumer was nicely stopped.')
# technically the pipe gets closed AFTER this print statement
# because we're using a with block
finally:
fifo.close()
def stop(self, *args, **kwargs):
self.event.set()
consumer = BuggyConsumer('/tmp/someNamedPipe')
signal.signal(signal.SIGINT, consumer.stop)
signal.signal(signal.SIGTERM, consumer.stop)
consumer.start()
К сожалению, на практике это не сработает, потому что open()
открывает файлы в режиме блокировки ,Это означает, что read()
вызовы блокируют вызывающий поток, что по существу предотвращает «хорошие» прерывания, если вы не проверяете промежуточные вызовы read
.Конкретно, если производитель прекратит производство, но оставит трубу открытой, потребитель будет вечно сидеть на fifo.readline()
и никогда не сможет проверить сигнал на «хорошее» завершение.
Образец (меньше ошибок) Потребитель
В этом примере устраняется проблема, связанная с неправильным поведением производителя, который ловит потребителя в блокирующем вызове чтения, но он значительно сложнее и вынуждает вас использовать API-интерфейсы более низкого уровня, которые не так дружелюбны:
import signal, os
from threading import Event
class ComplicatedConsumer:
def __init__(self, path):
self.path = path
self.event = Event()
def start(self):
# Open a file descriptor in a non-blocking way.
fifo = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
try:
while not self.event.wait(0.1):
try:
# This is FAR from a comprehensive implementation.
# We're making some pretty yucky assumptions.
line = os.read(fifo, 1000).decode('utf8')
if line:
print('Got from the producer:', line)
else:
print('EOF from the producer.')
break
except BlockingIOError:
# the call to os.read would have blocked (meaning we're
# caught up)
pass
print('The consumer was nicely stopped.')
finally:
os.close(fifo)
def stop(self, *args, **kwargs):
self.event.set()
A правильная реализация была бы FAR более сложной, потому что этот код наивно предполагает, что:
- каждый
read()
вызов из канала представляет собой одно полное сообщение;это худшее предположение.Вы можете ускорить работу производителя и увидеть, что этот менее ошибочный потребитель начинает читать несколько «строк» как одну строку. - строка никогда не занимает более 1000 байтов;более продвинутая реализация должна была бы буферизовать «частичные» сообщения, искать новые строки и делить их соответственно
Во всех случаях, кроме самых простых и медленных вариантов использования (как, скажем, один разсекунды (тикают часы), эта реализация потребовала бы ТОНА работы, чтобы быть практически полезной.
Sample Consumer (asyncio
)
Сложность при написании этого правильно состоит в том, что естьнесколько непредсказуемых источников событий (сигналы, входящие данные из канала).asyncio
позволяет вам выражать свой код как сопрограммы, и они могут быть приостановлены и возобновлены, когда Python захочет, но с указанием вами правил.
import asyncio
import aiofiles
class AsyncConsumer:
def __init__(self, path):
loop = asyncio.get_event_loop()
self.path = path
self.fifo_closed = loop.create_future()
self.fifo = None
async def start(self):
import aiofiles
self.fifo = await aiofiles.open(self.path, 'r')
done, pending = await asyncio.wait(
[self._read_lines(), self.fifo_closed],
return_when=asyncio.FIRST_COMPLETED)
print('The consumer is going to be nicely stopped...')
await self.fifo.close()
print('The consumer was nicely stopped.')
async def _read_lines(self):
try:
async for line in self.fifo:
print('Got from the producer:', line)
print('EOF from the producer.')
except ValueError:
# aiofiles raises a `ValueError` when the underlying file is closed
# from underneath it
pass
def stop(self, *args, **kwargs):
if self.fifo is not None:
print('we got the message')
self.fifo_closed.set_result(None)
loop = asyncio.get_event_loop()
consumer = AsyncConsumer('/tmp/someNamedPipe')
loop.add_signal_handler(signal.SIGINT, consumer.stop)
loop.add_signal_handler(signal.SIGTERM, consumer.stop)
loop.run_until_complete(consumer.start())
Метод async start()
запускает два потоков потоков дел работы: один, который читает строки одна за другой по мере их поступления, а другой, по существу, зависает до получения сигнала.Это происходит, когда ЛИБО из этих двух вещей завершается.
К сожалению, я заметил, что в конечном итоге aiofiles
опирается на реализацию блокировки под капотом, потому что метод await self.fifo.close()
все еще зависает, если выполняется read()
.Но, по крайней мере, есть место для размещения вашего кода.
Завершение его
В конечном счете, не существует отличного готового решения для решения вашей проблемы, но, надеюсь,один из этих вариантов может помочь вам решить вашу проблему.