Запуск событий в витой из другого потока - PullRequest
3 голосов
/ 16 декабря 2011

У меня есть приложение, которое для удобства (я повторно использую существующий код) было разделено на два разных потока:

  • одна нить с запущенным витым реактором
  • другой поток, выполняющий интерактивное меню

Одна из вещей, которую я хочу выполнить из интерактивного меню, - это взаимодействие с реактором. Как только пользователь дает конкретную команду, я хочу вызвать искаженное событие. Вот очень упрощенная версия моего кода:

from   twisted.spread                   import pb
from   twisted.internet                 import reactor
import threading

class TaskGatewaySupport():

    def __init__(self):
        self.object = None
        self.factory = pb.PBClientFactory()
        self.connector = None

    def gotObject(self, object):
        print 'gotObject > %s' % object
        self.object = object
        return object

    def gotData(self, data):
        return data

    def gotNoObject(self, reason):
        print 'gotNoObject > no object: %s' % reason

    def connect(self, task_gateway_host = '127.0.0.1', task_gateway_pb_port = 8889):
        print 'Connecting to %s:%s' % (task_gateway_host, task_gateway_pb_port)
        self.connector=reactor.connectTCP(task_gateway_host, task_gateway_pb_port, self.factory)
        d = self.factory.getRootObject()
        d.addCallbacks(self.gotObject, self.gotNoObject)
        return d

def Menu(task_gateway_support):
    while True:
        print '''

        A) Connect

        '''
        choice = raw_input('Option > ')
        if choice == 'A' : task_gateway_support.connect()
        else             : print "ERR: command not yet supported"

def version1():
    task_gateway_support  = TaskGatewaySupport()
    thread = threading.Thread(target = Menu, args = (task_gateway_support,))
    thread.start()
    reactor.run()

def version2():
    task_gateway_support  = TaskGatewaySupport()
    d = task_gateway_support.connect()
    reactor.run()

if __name__ == '__main__':
    version1()

Как видите, я показываю две разные версии:

  • я хочу запустить version1, но это не
  • version2 имеет только один поток, и он не является интерактивным

Запуск версии2 даст такой результат:

Connecting to 127.0.0.1:8889
gotObject > <twisted.spread.pb.RemoteReference instance at 0x88e734c>

Что я и ожидал.

Запуск версии 1 даст это:

        A) Connect


Option > A
Connecting to 127.0.0.1:8889


        A) Connect


Option > ^CgotNoObject > no object: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectError'>: An error occurred while connecting: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
].
]

Что я здесь делаю, так это выбираю опцию A, и, поскольку ничего не происходит, я нажимаю ^ C, которая показывает сообщение об ошибке.

Я думаю, что проблемы возникают из-за того, что я делю объект в двух разных потоках и пытаюсь вызвать витые события из не скрученного потока. Я надеялся, что, поскольку объект является общим, реактор будет знать о том, что что-либо происходит с ним.

Итак, мой главный вопрос: как я могу запускать витые события из другого потока?

Ответы [ 2 ]

2 голосов
/ 16 декабря 2011

Вам не следует использовать темы для этого. См. Взаимодействие с пользователем в витом процессе для получения информации о том, как принять пользовательский ввод в одном потоке.

Кроме того, используйте processor.callFromThread каждый раз, когда вы хотите вызвать любой Twisted API из потока без реактора.

0 голосов
/ 28 июня 2013

Я сам столкнулся с этой проблемой с помощью Twisted. К счастью, после долгих поисков я смог придумать этот ответ, на самом деле работает довольно хорошо! -

def my_function(s):
    do_something_with_s

class GetCommands():
def start(self, callable):
    self.callable = callable
    self.startReceiving()

def startReceiving(self, s = ''):
    self.callable(s)
    if s != 'exit':
        threads.deferToThread(raw_input,' >>> ').addCallback(self.startReceiving)

Тогда в основном -

getCmds = GetCommands()
reactor.callWhenRunning(getCmds.start, my_function)

reactor.listenTCP(PORT, factory)
reactor.run()
...