Обновить метку Tkinter во время работы с потоками в Python - PullRequest
1 голос
/ 08 февраля 2020

Я сейчас пытаюсь запрограммировать манипулятор робота, которым управляет Raspberry Pi.

Пока все работает нормально, кроме одной вещи, и я уже гуглил и пробовал все в течение многих часов, но не могу найти рабочее решение.

Для движения манипулятора робота необходимо запустить все двигатели «одновременно» с резьбой (работает нормально).

Проблема, с которой я столкнулся, заключается в том, что мне нужно обновите метку, которая показывает текущий угол оси (двигателя), как только она закончила свое движение, но другие двигатели все еще работают (резьбы).

После долгих исследований я подумал, что нашел решение, используя очередь и Tkinters после-метод. Но он все равно не работает, так как текст меток обновляется только после завершения всех потоков.

Я написал пример кода, в котором я хочу получить обновление метки для двигателя "one", которое завершит sh его for-l oop (100 итераций) перед двигателем "два" (500 итераций). Я ожидал, что метка будет обновлена, как только мотор один достигнет своей цели, в то время как мотор два все еще работает.

Но хотя я использовал метод after, он все еще ожидает завершения работы мотора два, прежде чем обновить метку.

Надеюсь, вы поможете мне!

from tkinter import *
import threading
import time
from queue import * 


class StepperMotors:

    def __init__(self, root):
        self.root = root

        self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
        self.start_btn.config(width = 10)
        self.start_btn.grid(row=1,column=1)

        self.label_one = Label(root, text='')
        self.label_one.config(width = 10)
        self.label_one.grid(row=2, column=1)

        self.label_two = Label(root, text='')
        self.label_two.config(width = 10)
        self.label_two.grid(row=3, column=1)

    def start_movement(self):
        self.thread_queue = Queue()
        self.root.after(100, self.wait_for_finish) 

        thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
        thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))

        thread_one.start()
        thread_two.start()

        thread_one.join()
        thread_two.join()

    def motor_actuation(self, motor, iterations):  
        for i in range(iterations):
            i = i+1  
            update_text = str(motor) + " " + str(i) + "\n"
            print(update_text)
            time.sleep(0.01)

        self.thread_queue.put(update_text)

    def wait_for_finish(self):
        try:      
            self.text = self.thread_queue.get()  
            self.label_one.config(text=self.text)  

        except self.thread_queue.empty():
            self.root.after(100, self.wait_for_finish)

if __name__ == "__main__":
    root = Tk()
    root.title("test")
    stepper = StepperMotors(root)
    root.mainloop()

1 Ответ

0 голосов
/ 08 февраля 2020

Лучше использовать не блокирующую нить демона.

Кроме того, лучше иметь разделение интересов : робот (или рука робота) может быть объектом, который имеет свое время жизни: поток демона. В идеале вы можете определить «LabelUpdater», который читает состояние робота и обновляет метку.

Давайте определим робота:

  • он создается при инициализации приложения и запускается когда пользователь нажимает кнопку «Пуск»,
  • Робот перемещается и сообщает свой угол в очереди многопоточности на уровне приложения,
class Robot(threading.Thread):

    def __init__(self, name: str, label_queue: queue.Queue, end_pos: int):
        super().__init__(name=name)
        self.daemon = True
        self.label_queue = label_queue
        self.end_pos = end_pos

    def run(self) -> None:
        for angle in range(self.end_pos):
            self.label_queue.put(angle)
            time.sleep(0.01)

Давайте определим LabelUpdater:

  • Он создается при инициализации приложения и работает вечно (он может наблюдать за роботом, даже если он не работает).
  • Он читает очередь робота (например, каждую секунду, чтобы избежать мигания). ) и обновите метку
class LabelUpdater(threading.Thread):
    def __init__(self, name: str, label_queue: queue.Queue, root_app: tkinter.Tk, variable: tkinter.Variable):
        super().__init__(name=name)
        self.daemon = True
        self.label_queue = label_queue
        self.root_app = root_app
        self.variable = variable

    def run(self) -> None:
        # run forever
        while True:
            # wait a second please
            time.sleep(1)
            # consume all the queue and keep only the last message
            last_msg = None
            while True:
                try:
                    msg = self.label_queue.get(block=False)
                except queue.Empty:
                    break
                last_msg = msg
                self.label_queue.task_done()
            if last_msg:
                self.variable.set(last_msg)

Затем основное приложение должно определить:

  • 2 очереди многопоточности: по одной на каждую метку,
  • 2 tkinter.StringVar переменная, которая будет обновляться,
  • роботы и средства обновления ярлыков,
  • два средства обновления запущены и будут работать вечно.
class StepperMotors:
    def __init__(self, root):
        self.root = root
        self.label_one_queue = queue.Queue()
        self.label_two_queue = queue.Queue()

        self.start_btn = tkinter.Button(root, text="Start", command=lambda: self.start_movement())
        self.start_btn.config(width=10)
        self.start_btn.grid(row=1, column=1)

        self.text_one = tkinter.StringVar()
        self.text_one.set("one")
        self.label_one = tkinter.Label(root, textvariable=self.text_one)
        self.label_one.config(width=10)
        self.label_one.grid(row=2, column=1)

        self.text_two = tkinter.StringVar()
        self.text_two.set("two")
        self.label_two = tkinter.Label(root, textvariable=self.text_two)
        self.label_two.config(width=10)
        self.label_two.grid(row=3, column=1)

        self.robot_one = Robot("robot_one", self.label_one_queue, 100)
        self.robot_two = Robot("robot_two", self.label_two_queue, 500)

        self.updater_one = LabelUpdater("updater_one", self.label_one_queue, self.root, self.text_one)
        self.updater_two = LabelUpdater("updater_two", self.label_two_queue, self.root, self.text_two)
        self.updater_one.start()
        self.updater_two.start()

    def start_movement(self):
        self.robot_one.start()
        self.robot_two.start()

Конечно, вам нужен флаг или что-то, чтобы проверить, что каждый робот еще не запущен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...