Threading с Python - PullRequest
       27

Threading с Python

2 голосов
/ 22 февраля 2020

complete python newb ie ... Я работаю с пакетом Arduino pyfirmata, и я пытаюсь сделать что-то довольно простое.

В зависимости от ввода пользователя в python, я хочу светодиод горит sh или нет.

Моя проблема в том, что программа python запрашивает ввод пользователя только один раз, но я бы хотел, чтобы он всегда запрашивал ввод, чтобы пользователь мог изменить функцию в в любое время.

Я пытался использовать пакет потоков, но безуспешно ... Возможно, есть более простой способ, но я совершенно новичок в кодировании, поэтому не знаю других. Открыт для предложений !!

Вот мой код,

import pyfirmata
import threading
import time

board = pyfirmata.Arduino('/dev/cu.usbmodem14101')

def flash():
    for i in range(1000):
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

def stop():
    board.digital[13].write(0)


while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=flash(), args=(1,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        x = threading.Thread(target=stop(), args=(1,))
        x.start()
        #x.join()

Ответы [ 3 ]

0 голосов
/ 23 февраля 2020

Вы получили ошибку в коде.

Вы должны создать поток с помощью:

x = threading.Thread(target=flash)

Примечание: Вы дали введенный 'fla sh ()', поэтому выполняете Метод в основной теме. А также у вашего метода нет аргументов, поэтому вы можете удалить значения args

0 голосов
/ 23 февраля 2020

Вы можете сделать это объектно-ориентированным способом, создав собственный подкласс Thread, например, класс Flasher ниже.

Одним из преимуществ этого подхода является то, что он будет относительно простым расширить класс Flasher и заставить его управлять светодиодами, подключенными к разным выходам, или разрешить указание задержки между вспышками во время создания. Выполнение первого позволит одновременно запускать несколько экземпляров.

import pyfirmata
import threading
import time

OFF, ON = False, True

class Flasher(threading.Thread):
    DELAY = 1

    def __init__(self):
        super().__init__()
        self.daemon = True
        self.board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
        self.flashing = False
        self.LED_state = OFF

    def turn_LED_on(self):
        self.board.digital[13].write(1)
        self.LED_state = ON

    def turn_LED_off(self):
        self.board.digital[13].write(0)
        self.LED_state = OFF

    def run(self):
        while True:
            if self.flashing:
                if self.LED_state == ON:
                    self.turn_LED_off()
                else:
                    self.turn_LED_on()
            time.sleep(self.DELAY)

    def start_flashing(self):
        if self.LED_state == OFF:
            self.turn_LED_on()
        self.flashing = True

    def stop_flashing(self):
        if self.LED_state == ON:
            self.turn_LED_off()
        self.flashing = False


flasher = Flasher()
flasher.start()

while True:
    runMode = input("Run or Stop? ").strip().lower()

    if runMode == "run":
        flasher.start_flashing()
    elif runMode == "stop":
        flasher.stop_flashing()
    else:
        print('Unknown response ignored')
0 голосов
/ 22 февраля 2020

Если вы хотите просто уничтожить поток, вы можете использовать многопроцессорную обработку, которую может выполнять многопроцессорная обработка. Процесс может p.terminate ()

p = Process(target=flash, args=(,))
while True:
    runMode = input("Run or Stop? ")
    if runMode == "Run":
        p.start()
    elif runMode == "Stop":
        p.terminate()

Однако не рекомендуется просто уничтожать потоки, поскольку это может привести к ошибкам, если процесс обрабатывает критические ресурсы или зависит от других потоков, см. здесь для лучшего объяснения Есть ли способ уничтожить поток?

Лучший способ, как описано здесь, заключается в использовании флагов для обработки ваши перепрошивки, они позволяют простое взаимодействие между потоками

from threading import Event

e = event()

def check_for_stop(e):
    while not e.isSet():
         flash()
    print("Flashing Ended")

while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=check_for_stop, args=(e,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        e.set() #set flag true
        e.clear() #reset flag

вот документация для получения дополнительной информации об объектах событий https://docs.python.org/2.0/lib/event-objects.html

Я не проверял этот код просто пример, поэтому извиняюсь, если он не работает сразу

Редактировать: Просто глядя на вашу функцию еще раз, вы захотите проверить флаг во время перепрошивки, это моя ошибка, извиняется, так что ваша функция fla sh будет выглядеть как

def flash():
    while e.isSet():
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

, и вы передадите это в поток, как раньше

x = threading.Thread(target=flash(), args=(1,))
x.start()
...