Python: как смоделировать передачу широковещательных сообщений между потоками? - PullRequest
0 голосов
/ 09 апреля 2019

Я пишу небольшую параллельную программу на Python 3.6. У меня вопрос: моя программа имеет небольшой класс Thread (который имитирует поток); у этого класса есть 3 метода, которые выполняются как подпотоки:

class myThread(Thread):
  def __init__(self, identifier):
    super(myThread, self).__init__() 

  def fun1(self):
    # broadcasts messages

  def fun2(self):
    # event that occurs when a message arrives
    # do something

  def fun3(self):
    # event that occurs when a message arrives
    # do something

  def run(self):
    t1 = Thread(target = self.fun1)
    t2 = Thread(target = self.fun2)
    t3 = Thread(target = self.fun3)
    t1.start()
    t2.start()
    t3.start()

Как видите, fun1() отправляет широковещательные сообщения (он отправляет объекты), которые должны получить другие 2 потока. Как это можно легко реализовать в Python? Я видел, что самый простой способ - это использовать Queue, но у меня есть некоторые сомнения ... где я должен поставить эту очередь? Как общий метод может использовать представленный объект, не опустошая эту очередь (поскольку «широковещательный» объект должен использоваться другими методами)? Как метод выполняет свое тело каждый раз, когда новый объект добавляется в очередь (как если бы это было событие)?

Большое спасибо всем!

1 Ответ

1 голос
/ 09 апреля 2019

хороший способ связи между потоками - использование очереди. Лучше использовать назначенную очередь для каждого потока. Вот как вы реализуете ее в своем коде:

from queue import Queue
from threading import Thread
import time

# define some queues
fun2_q = Queue()
fun3_q = Queue()

class myThread(Thread):
    def __init__(self, identifier):
        super(myThread, self).__init__() 

    def fun1(self):
        print('starting fun1')

        # broadcasts messages
        fun2_q.put('say something')
        fun3_q.put('say something')

        fun2_q.put('quit')
        fun3_q.put('quit')



    def fun2(self):
        # event that occurs when a message arrives
        # as a listener we should use infinite loop to monitor messages 
        # we will use non blocking way to read the queue using "if", also we can use fun2_q.get_nowait()
        # instead of "if fun2_q.qsize() > 0:" statement

        while True:
            if fun2_q.qsize() > 0:
                msg = fun2_q.get()
                if msg == 'say something':
                    print('fun2 method saying hello')
                elif msg == 'quit':
                    break  # quit thread

            # do other stuff below if no messages coming

            time.sleep(0.1)  # to stop while loop from abusing processor

        print('fun2 terminating')


    def fun3(self):
        # event that occurs when a message arrives
        # we will use a blocking way to read the queue
        while True:

            msg = fun3_q.get() # it will block here waiting for a message to come
            if msg == 'say something':
                print('fun3 method saying hello')
            elif msg == 'quit':
                    break  # quit thread

            # can't do other stuff below if no messages coming, the loop will stuck waiting new message

            # time.sleep(0.1)  # no need for it since the loop will wait anyway

        print('fun3 terminating')

    def run(self):
        t1 = Thread(target = self.fun1)
        t2 = Thread(target = self.fun2)
        t3 = Thread(target = self.fun3)
        t1.start()
        t2.start()
        t3.start()

my_thread = myThread(1)
my_thread.run()

вывод:

starting fun1
fun2 method saying hello
fun3 method saying hello
fun3 terminating
fun2 terminating
...