Нерегулярная проблема передачи с Python Twisted Push Producer - PullRequest
2 голосов
/ 19 сентября 2010

Я хочу передать данные из очереди, используя Twisted.В настоящее время я использую push-поставщик для опроса очереди на предмет и записи в транспорт.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

Проблема в том, что данные отправляются очень нерегулярно, и если в очереди был только один элемент, данныеникогда не будет отправлено.Кажется, что Twisted ждет, пока передаваемые данные не увеличатся до определенного значения, пока не передадут их. Правильно ли я реализовал моего продюсера?Могу ли я заставить Twisted передавать данные сейчас ?

Я также пытался использовать пулл-продюсер, но Twisted вообще не вызывает его метод resumeProducing().Должен ли я вызывать метод resumeProducer() извне, когда используется источник извлечения?

Ответы [ 2 ]

2 голосов
/ 19 сентября 2010

Трудно сказать, почему ваш продюсер плохо работает, не видя полного примера (то есть, не видя также код, который регистрирует его у потребителя, и код, который помещает элементы в эту очередь).

Однако одна проблема, с которой вы, вероятно, столкнетесь, заключается в том, что если ваша очередь пуста при вызове resumeProducing, то вы вообще не будете писать байтов потребителю. И когда элементы помещаются в очередь, они будут оставаться там вечно, потому что потребитель не собирается снова вызывать ваш resumeProducing метод.

И это распространяется на любой другой случай, когда в очереди недостаточно данных, чтобы потребитель вызывал pauseProducing для вашего производителя. Как продюсер, ваша задача продолжать производить данные самостоятельно, пока потребитель не позвонит по номеру pauseProducing (или stopProducing).

В данном конкретном случае это, вероятно, означает, что всякий раз, когда вы собираетесь что-то поместить в эту очередь, - остановитесь: проверьте, не остановился ли производитель, а если нет, запишите его потребителю вместо . Только помещать элементы в очередь, когда продюсер поставлен на паузу.

0 голосов
/ 26 июня 2013

Вот два возможных решения:

1) Периодически опрашивайте ваше местное приложение, чтобы узнать, есть ли у вас дополнительные данные для отправки.

NB. Это основывается на периодическом асинхронном обратном вызове из метода deferLater в витой. Если вам нужно отзывчивое приложение, которое отправляет данные по требованию, или длительная блокирующая операция (например, пользовательский интерфейс, который использует свой собственный цикл обработки событий), это может не подойти.

Код:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) Вручную отследите экземпляры протокола и используйте реактор.callFromThread () из другого потока. Позволяет вам избежать длительной операции блокировки в другом потоке (например, цикл событий пользовательского интерфейса).

Код:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

Лично я нахожу тот факт, что IPushProducer и IPullProducer требуют периодического обратного вызова, что делает их менее полезными. Другие не согласны ... пожимает плечами . Выбирай.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...