Реализация службы потоковой передачи WSGI: (как обнаружить разрывы соединения с клиентом) - PullRequest
5 голосов
/ 04 декабря 2011

Итак, я пишу потоковую службу WSGI, которая использует очередь, завернутую в итератор, для реализации многоадресной передачи.Ниже приведена упрощенная модель сервиса:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

И это прекрасно работает с витой в качестве сервера WSGI (кроме того, последовательный генератор - это отдельный процесс, подключенный к процессору через процессочередь).У меня вопрос, как я могу определить, когда клиент отключается, и, таким образом, удалить его из очереди?Хотя я добавляю очередь в виде кортежа с клиентским сокетом, т. Е. (Сокет, очередь), а затем проверяю, подключен ли сокет до того, как я выполню установку.Тем не менее, я не знаю точно, что взять из окружающей среды.Есть ли у кого-нибудь опыт работы с этим до того, как я взломал что-нибудь вместе?

Обновлено

Вот решение, с которым я наконец-то пришел:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)

Ответы [ 2 ]

1 голос
/ 04 декабря 2011

витые вызовы .close() на итераторе, если они присутствуют, когда запрос завершен или прерван.Вы можете сделать что-то вроде:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

, где ResponseIterator может быть:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()
0 голосов
/ 04 декабря 2011

Читать:

http://groups.google.com/group/modwsgi/browse_frm/thread/8ebd9aca9d317ac9

Некоторые специфичны для mod_wsgi, но в целом те же проблемы применимы к любому серверу WSGI.

...