Asyncore Python для периодической отправки данных с использованием переменной времени ожидания. Есть ли способ лучше? - PullRequest
8 голосов
/ 24 июня 2009

Я хотел написать сервер, к которому клиент мог бы подключаться и получать периодические обновления без необходимости опроса. Проблема, с которой я столкнулся при работе с asyncore, заключается в том, что если вы не возвращаете true при вызове dispatcher.writable (), вам придется подождать, пока не истечет тайм-аут asyncore.loop (по умолчанию 30 с).

Два способа, которые я пытался обойти, это 1) сократить время ожидания до низкого значения или 2) запросить соединения, когда они будут в следующий раз обновляться и генерировать адекватное значение времени ожидания. Однако, если вы ссылаетесь на «Select Law» в «man 2 select_tut», он гласит: «Вы всегда должны пытаться использовать select () без тайм-аута».

Есть ли лучший способ сделать это? Витая может быть? Я хотел попытаться избежать лишних тем. Я включу пример переменной времени ожидания здесь:

#!/usr/bin/python

import time
import socket
import asyncore


# in seconds
UPDATE_PERIOD = 4.0

class Channel(asyncore.dispatcher):

    def __init__(self, sock, sck_map):
        asyncore.dispatcher.__init__(self, sock=sock, map=sck_map)
        self.last_update = 0.0  # should update immediately
        self.send_buf = ''
        self.recv_buf = ''

    def writable(self):
        return len(self.send_buf) > 0

    def handle_write(self):
        nbytes = self.send(self.send_buf)
        self.send_buf = self.send_buf[nbytes:]

    def handle_read(self):
        print 'read'
        print 'recv:', self.recv(4096)

    def handle_close(self):
        print 'close'
        self.close()

    # added for variable timeout
    def update(self):
        if time.time() >= self.next_update():
            self.send_buf += 'hello %f\n'%(time.time())
            self.last_update = time.time()

    def next_update(self):
        return self.last_update + UPDATE_PERIOD


class Server(asyncore.dispatcher):

    def __init__(self, port, sck_map):
        asyncore.dispatcher.__init__(self, map=sck_map)
        self.port = port
        self.sck_map = sck_map
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind( ("", port))
        self.listen(16)
        print "listening on port", self.port

    def handle_accept(self):
        (conn, addr) = self.accept()
        Channel(sock=conn, sck_map=self.sck_map)

    # added for variable timeout
    def update(self):
        pass

    def next_update(self):
        return None


sck_map = {}

server = Server(9090, sck_map)
while True:
    next_update = time.time() + 30.0
    for c in sck_map.values():
        c.update()  # <-- fill write buffers
        n = c.next_update()
        #print 'n:',n
        if n is not None:
            next_update = min(next_update, n)
    _timeout = max(0.1, next_update - time.time())

    asyncore.loop(timeout=_timeout, count=1, map=sck_map)

Ответы [ 5 ]

4 голосов
/ 24 апреля 2010

Может быть, вы можете сделать это с sched.scheduler, вот так (n.b. не тестировалось):

import sched, asyncore, time

# Create a scheduler with a delay function that calls asyncore.loop
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time()) )

# Add the update timeouts with scheduler.enter
# ...

def _poll_loop(timeout, start_time):  
  asyncore.loop(timeout, count=1)
  finish_time = time.time()
  timeleft = finish_time - start_time
  if timeleft > timeout:  # there was a message and the timeout delay is not finished
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket

def main_loop():
  while True:
    if scheduler.empty():
      asyncore.loop(30.0, count=1) # just default timeout, use what suits you
      # add other work that might create scheduled events here
    else:
      scheduler.run()
4 голосов
/ 24 июня 2009

«Право выбора» не применимо к вашему случаю, так как вы выполняете не только действия, инициируемые клиентом (на чистом сервере), но и действия, инициируемые временем - это как раз то, для чего предназначен тайм-аут выбора. Закон действительно должен сказать: «если вы указываете тайм-аут, убедитесь, что вам действительно нужно сделать что-то полезное, когда истечет тайм-аут». Закон предназначен для защиты от занятого ожидания; ваш код не занят - подождите.

Я бы не установил для _timeout максимум 0,1 и время следующего обновления, но максимум 0,0 и следующий тайм-аут. IOW, если период обновления истек, когда вы выполняли обновления, вы должны сделать это конкретное обновление прямо сейчас.

Вместо того, чтобы спрашивать каждый канал каждый раз, хочет ли он обновить, вы можете сохранить все каналы в приоритетной очереди (отсортированной по времени следующего обновления), а затем запускать обновление только для самых ранних каналов (пока не найдете тот, чье обновление время не пришло). Для этого вы можете использовать модуль heapq.

Вы также можете сохранить несколько системных вызовов, не запрашивая у каждого канала текущее время, а только один раз опрашивая текущее время и передавая его в .update.

2 голосов
/ 10 февраля 2011

Это в основном решение Демиурга с округлыми неровными краями. Это сохраняет его основную идею, но предотвращает RuntimeErrors и занятые циклы и проверяется. [Изменить: устранены проблемы с изменением планировщика во время _delay]

class asynschedcore(sched.scheduler):
    """Combine sched.scheduler and asyncore.loop."""
    # On receiving a signal asyncore kindly restarts select. However the signal
    # handler might change the scheduler instance. This tunable determines the
    # maximum time in seconds to spend in asycore.loop before reexamining the
    # scheduler.
    maxloop = 30
    def __init__(self, map=None):
        sched.scheduler.__init__(self, time.time, self._delay)
        if map is None:
            self._asynmap = asyncore.socket_map
        else:
            self._asynmap = map
        self._abort_delay = False

    def _maybe_abort_delay(self):
        if not self._abort_delay:
            return False
        # Returning from this function causes the next event to be executed, so
        # it might be executed too early. This can be avoided by modifying the
        # head of the queue. Also note that enterabs sets _abort_delay to True.
        self.enterabs(0, 0, lambda:None, ())
        self._abort_delay = False
        return True

    def _delay(self, timeout):
        if self._maybe_abort_delay():
            return
        if 0 == timeout:
            # Should we support this hack, too?
            # asyncore.loop(0, map=self._asynmap, count=1)
            return
        now = time.time()
        finish = now + timeout
        while now < finish and self._asynmap:
            asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap,
                          count=1)
            if self._maybe_abort_delay():
                return
            now = time.time()
        if now < finish:
            time.sleep(finish - now)

    def enterabs(self, abstime, priority, action, argument):
        # We might insert an event before the currently next event.
        self._abort_delay = True
        return sched.scheduler.enterabs(self, abstime, priority, action,
                                        argument)

    # Overwriting enter is not necessary, because it is implemented using enter.

    def cancel(self, event):
        # We might cancel the next event.
        self._abort_delay = True
        return sched.scheduler.cancel(self, event)

    def run(self):
        """Runs as long as either an event is scheduled or there are
        sockets in the map."""
        while True:
            if not self.empty():
                sched.scheduler.run(self)
            elif self._asynmap:
                asyncore.loop(self.maxloop, map=self._asynmap, count=1)
            else:
                break
1 голос
/ 24 июня 2009

Я бы использовал Twisted, долгое время с тех пор, как я использовал асинхронное ядро, но я думаю, что это должен быть витой эквивалент (не проверенный, записанный из памяти):

from twisted.internet import reactor, protocol
import time

UPDATE_PERIOD = 4.0

class MyClient(protocol.Protocol):

    def connectionMade(self):
        self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update)

    def connectionLost(self, reason):
        self.updateCall.cancel()

    def update(self):
        self.transport.write("hello %f\n" % (time.time(),))

    def dataReceived(self, data):
        print "recv:", data


f = protocol.ServerFactory()
f.protocol = MyClient

reactor.listenTCP(9090, f)
reactor.run()
0 голосов
/ 10 июля 2011

Может быть, я не понимаю, что пытался выполнить OP, но я просто решил эту проблему, используя 1 поток, который получает слабую ссылку для каждого объекта Channel (asyncore.dispatcher). Этот поток определяет собственное время и будет периодически отправлять каналу обновление, используя очередь в этом канале. Он получает Очередь от объекта Channel, вызывая getQueue.

Причина, по которой я использую слабую ссылку, заключается в том, что клиенты временные. Если канал умирает, то слабый ответ возвращает None. Таким образом, поток синхронизации не сохраняет старые объекты живыми, потому что он ссылается на них.

Я знаю, что ОП хотел избежать потоков, но это решение очень простое. Он создает только один поток и обращается к любым каналам, которые создаются, когда объект Server добавляет их в список потоков для отслеживания объектов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...