Python: Queue.get () блокирует код от проверки соединения с сокетом - PullRequest
0 голосов
/ 10 марта 2011

Я запускаю поток, который захватывает сообщение из очереди, отправляет его клиенту и получает подтверждение. Если клиент отключается, поток ловит ошибку сокета и завершает работу. Проблема в том, что если msgQ пуст, поток никогда не проверяет соединение с сокетом. Есть ли способ установить этот код, чтобы, даже если очередь пуста, сокет проверен? (проблема в том, что без сообщения отправлять нечего)

Нужно ли отправлять специальный пинг? Понг! сообщение, если msgQ пусто (и проверьте на клиенте, является ли сообщение данными журнала или пингом?)? Любая помощь будет оценена.

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty: pass

Ответы [ 3 ]

1 голос
/ 10 марта 2011

Если msgQ пусто, то вызов self.msgQ.get() вызывает исключение Empty и полностью пропускает вызовы self.sock.send() и self.sock.recv().Ваш обработчик исключения для Empty ничего не делает, поэтому ваш код будет занят ждать до тех пор, пока что-то не появится в msgQ, не вызывая send или recv.

OneВозможное решение - использовать модуль Python select для проверки сокета в вашем обработчике исключений.Что-то вроде этого:

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty:
            results = select.select([], [], [self.sock], 0.5) # timeout of 0.5 seconds
            if self.sock in results[2]:
                print 'exceptional condition on socket'
                self.terminate()
                break
0 голосов
/ 15 мая 2018

У меня появилась идея, как обернуть очередь и трубу в класс. И затем использование модуля выбора для проверки сокета и канала не будет использовать опрос для мониторинга события.

Следующий пример кода для переноса очереди и конвейера:

class MyQueue(object):
"""docstring for MyQueue"""
def __init__(self, arg):
    super(MyQueue, self).__init__()
    # self.arg = arg
    self._queue = Queue.Queue()
    self._rdfd, self._wrfd = os.pipe()
    return

def enQ(self, item=None, block=True, timeout=None):
    os.write(self._wrfd, '+')
    self._queue.put(item)
    return

def deQ(self, block=True, timeout=None):
    _itm = self._queue.get(block=block, timeout=timeout)
    _msg = os.read(self._rdfd, 1)
    return _itm

def get_notify(self):
    return self._rdfd

    print 'append queue'
    self.inp.append(self._myq.get_notify())
    print 'append socket'
    self.inp.append(self.sock)
    print self.inp

    _slp = self.slp
    _inp = self.inp
    _out = self.out
    _err = self.err
    _in, _out, _err = select.select(_inp, _out, _err, _slp)
0 голосов
/ 10 марта 2011

Если вы используете Queue.Queue, вы можете позвонить Queue.get() с необязательными аргументами block и timeout или вызвать Queue.get_nowait(), что эквивалентно Queue.get(block=False) и немедленно вернется, если очередь пуста:

try:
    # wait for 1/10 second then return
    msgs = self.msgQ.get(timeout=0.1)
except Queue.Empty, qe:
    # handle empty queue 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...