Вот два возможных решения:
1) Периодически опрашивайте ваше местное приложение, чтобы узнать, есть ли у вас дополнительные данные для отправки.
NB. Это основывается на периодическом асинхронном обратном вызове из метода deferLater в витой. Если вам нужно отзывчивое приложение, которое отправляет данные по требованию, или длительная блокирующая операция (например, пользовательский интерфейс, который использует свой собственный цикл обработки событий), это может не подойти.
Код:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time
# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
while True:
protocol.send(b"Hello World\n")
yield deferLater(reactor, 2, lambda: None)
# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
self.task = cooperate(gen)
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
try:
self.task.stop()
except:
pass
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
2) Вручную отследите экземпляры протокола и используйте реактор.callFromThread () из другого потока. Позволяет вам избежать длительной операции блокировки в другом потоке (например, цикл событий пользовательского интерфейса).
Код:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading
# Connection
protocol = None
# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
def run(self):
while True:
print("Thread loop")
time.sleep(random.randint(0, 4))
if protocol is not None:
reactor.callFromThread(self.dispatch)
def dispatch(self):
global protocol
protocol.send("Hello World\n")
# Push protocol
class PushProtocol(Protocol):
def connectionMade(self):
global protocol
protocol = self
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def connectionLost(self, reason):
print 'Connection lost'
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Start thread
other = SomeThread()
other.start()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
Лично я нахожу тот факт, что IPushProducer и IPullProducer требуют периодического обратного вызова, что делает их менее полезными. Другие не согласны ... пожимает плечами . Выбирай.