Почему Thrift вызывает AssertionError при запросе несколькими потоками? - PullRequest
0 голосов
/ 08 мая 2018

Я строю своего рода симулятор, который использует протокол Thrift.Но когда я выполняю несколько потоков моего виртуального оборудования, отправляющего сообщения, программа через короткое время прерывает работу, принимая их, думает, что буфер перегружен или что-то в этом роде, или нет, поэтому я прошу помощи, если это возможно.

Вот основные части моего кода

Класс для работы с потоками:

class ThreadManager (threading.Thread):    
   def __init__(self, name, obj, client, layout):
      threading.Thread.__init__(self)
      self.name = name
      self.obj = obj
      self.client = client
      self.layout = layout


   def run(self):
       print ("Starting " + self.name)           
       while True:
           sleep(2)
           self.obj.auto_gen_msg(self.client, layout=self.layout)

Метод генерации сообщений:

def auto_gen_msg(self, client, layout='',  min_delay=15, max_delay=30):

        if not layout:
            msg = self.gen_message(self.draw_random_model())
        else:
            msg = self.gen_message(layout)
        wait = randint(min_delay, max_delay)
        sleep(wait)

        print(self.eqp_type, " delivered a message ...")
        getattr(client, msg[0])(*msg[1])

Основной:

def start(layout, equipment, number):

    try:
        host = 'localhost'

        transport = TSocket.TSocket(host, port=9090)

        transport = TTransport.TBufferedTransport(transport)

        protocol = TCompactProtocol.TCompactProtocol(transport)

        client = SuiteService.Client(protocol)

        transport.open()

        equips = [Equipment(equipment) for i in range(number)]

        threads = [ThreadManager(i.eqp_type, i, client, layout) for i in equips]

        for i in range(len(threads)):
            threads[i].start()
            sleep(2)

        while True:
            pass


        transport.close() 

    except Thrift.TException as tx:
        print ("%s " % (tx.message))

Ошибка преследует меня:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/thread_manager.py", line 39, in run
    self.obj.auto_gen_msg(self.client, layout=self.layout)
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/virtual.py", line 281, in auto_gen_msg
    getattr(client, msg[0])(*msg[1])
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4895, in v1
    self.send_v1(ir, ts, ch, o1, o2, o3, o4, o5, o6, o7)
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4899, in send_v1
    self._oprot.writeMessageBegin('v1', TMessageType.CALL, self._seqid)
  File "/Users/lem4fia/Documents/sg/loki/lokiv/lib/python3.6/site-packages/thrift-0.11.0-py3.6-macosx-10.6-intel.egg/thrift/protocol/TCompactProtocol.py", line 156, in writeMessageBegin
    assert self.state == CLEAR
AssertionError

Любопытно, что это не приводит к ошибке, если инстанцирование 2 виртуальных устройств в потоке, но 10 виртуальных устройств (иногда меньше этого) достаточно дляподнять эту ошибку.

Может кто-нибудь, пожалуйста, дай мне свет?:)

Ответы [ 2 ]

0 голосов
/ 09 мая 2018

Как правило 1) , Thrift не предназначен для использования в нескольких потоках.

Это, по крайней мере, насколько мне известно, верно для всех поддерживаемых в настоящее время языков.

Подойдет один экземпляр для каждого потока.


1) помимо серверных вещей, таких как TThreadedServer или TThreadPoolServer

0 голосов
/ 08 мая 2018

Проблема в том, что вам кажется, что вам нужно использовать один и тот же транспортный объект для каждого потока. Вероятно, это связано с реализацией Thrift!

Ссылка здесь: http://grokbase.com/t/thrift/user/134s16ks4m/single-connection-and-multiple-threads

...