Правильный способ отмены приема и закрытия подключения к обработчику / многопроцессорному прослушивателю Python - PullRequest
11 голосов
/ 11 декабря 2008

(в этом примере я использую модуль pyprocessing , но замена обработки на многопроцессорную работу, вероятно, будет работать, если вы запустите python 2.6 или используете многопроцессорный бэкпорт )

В настоящее время у меня есть программа, которая слушает сокет Unix (используя processing.connection.Listener), принимает соединения и создает поток, обрабатывающий запрос. В определенный момент я хочу изящно завершить процесс, но так как вызов accept () блокирует, и я не вижу способа отменить его хорошим способом. У меня есть один способ, который работает здесь (OS X) по крайней мере, установить обработчик сигнала и сигнализировать процесс из другого потока, например, так:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

Единственный другой способ, о котором я подумал, - это проникнуть глубоко в соединение (listener._listener._socket) и установить параметр неблокирования ... но это, вероятно, имеет некоторые побочные эффекты и, как правило, действительно страшно.

Есть ли у кого-нибудь более элегантный (и, возможно, даже правильный!) Способ сделать это? Он должен быть переносимым на OS X, Linux и BSD, но переносимость Windows и т. Д. Не требуется.

Разъяснение : Спасибо всем! Как обычно, в моем первоначальном вопросе обнаруживаются неясности:)

  • Мне нужно выполнить очистку после отмены прослушивания, и я не всегда хочу на самом деле выйти из этого процесса.
  • Мне нужно иметь доступ к этому процессу из других процессов, не порожденных тем же родителем, что делает очереди громоздкими
  • Причины для потоков:
    • Они получают доступ к общему состоянию. На самом деле более или менее распространенная база данных в памяти, так что, я полагаю, это можно сделать по-другому.
    • Я должен иметь возможность принимать несколько соединений одновременно, но фактические потоки блокируют что-то большую часть времени. Каждое принятое соединение порождает новый поток; это для того, чтобы не блокировать всех клиентов на операциях ввода / вывода.

Что касается потоков и процессов, я использую потоки для того, чтобы запретить операции блокировки, а процессы для обеспечения многопроцессорности.

Ответы [ 5 ]

3 голосов
/ 21 января 2009

Я думал, что смогу избежать этого, но, похоже, мне нужно сделать что-то вроде этого:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

Я знаю, что это нарушает закон деметры и вводит некоторые злые помехи для обезьян, но, похоже, это будет самый простой способ сделать это. Если у кого-нибудь есть более элегантное решение, я буду рад его услышать:)

3 голосов
/ 10 января 2009

Разве это не то, что выбрать для ??

Принимать вызовы только через сокет, если select указывает, что он не будет блокироваться ...

У выбора есть тайм-аут, поэтому вы можете время от времени выходить из строя, чтобы проверить если пора выключаться ....

1 голос
/ 11 декабря 2008

Я новичок в модуле многопроцессорной обработки, но мне кажется, что смешивание модуля обработки и модуля многопоточности нелогично, не направлены ли они на решение одной и той же проблемы?

В любом случае, как насчет упаковки ваших функций прослушивания в сам процесс? Мне не ясно, как это влияет на остальную часть вашего кода, но это может быть более чистой альтернативой.

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'
0 голосов
/ 02 июня 2018

Я столкнулся с той же проблемой. Я решил это, отправив команду «стоп» слушателю. В главном потоке слушателя (который обрабатывает входящие сообщения) каждый раз, когда поступает новое сообщение, я просто проверяю, является ли это командой «стоп», и выхожу из основного потока.

Вот код, который я использую:

def start(self):
    """
    Start listening
    """
    # set the command being executed
    self.command = self.COMMAND_RUN

    # startup the 'listener_main' method as a daemon thread
    self.listener = Listener(address=self.address, authkey=self.authkey)
    self._thread = threading.Thread(target=self.listener_main, daemon=True)
    self._thread.start()

def listener_main(self):
    """
    The main application loop
    """

    while self.command == self.COMMAND_RUN:
        # block until a client connection is recieved
        with self.listener.accept() as conn:

            # receive the subscription request from the client
            message = conn.recv()

            # if it's a shut down command, return to stop this thread
            if isinstance(message, str) and message == self.COMMAND_STOP:
                return

            # process the message

def stop(self):
    """
    Stops the listening thread
    """
    self.command = self.COMMAND_STOP
    client = Client(self.address, authkey=self.authkey)
    client.send(self.COMMAND_STOP)
    client.close()

    self._thread.join()

Я использую ключ аутентификации, чтобы хакеры не закрывали мой сервис, отправляя команду остановки с произвольного клиента.

Шахта не идеальное решение. Кажется, лучшим решением может быть пересмотр кода в multiprocessing.connection.Listener и добавление метода stop(). Но для этого потребуется отправить его через процесс для утверждения командой Python.

0 голосов
/ 11 декабря 2008

Возможно, не идеально, но вы можете освободить блок, отправив сокету некоторые данные из обработчика сигнала или потока, завершающего процесс.

РЕДАКТИРОВАТЬ: другим способом реализации этого может быть использование Connection Очереди , поскольку они, похоже, поддерживают тайм-ауты (извините, я неправильно прочитал ваш код при первом чтении). 1007 *

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...