У меня есть класс, который управляет поворотным механическим энкодером. Сторона обновления (состояние чтения, вывод инкремента и обновление свойства steps
) обрабатываются в отдельном потоке, а обновленное значение step
передается в get_steps
. Функция button_callback
также обрабатывается в отдельном потоке по милости GPIO.add_event_detect
. В соответствии с Часто задаваемые вопросы по Python 3 я использовал два мьютекса, потому что может быть неопределенность, когда свойства считываются или сбрасываются в основном потоке:
Первое - это управление выполнением переданной функции button_callback
нажатием кнопки кодера - переданная функция, скорее всего, отредактирует глобальную переменную, на которую могут воздействовать и другие элементы основного потока. Я понимаю, что button_lock
нужно будет развернуть в другом месте основного потока для связанных переменных, но я вполне уверен, что это разумно.
Вторым является управление доступом к свойству steps
, в частности, для остановки метода reset
(потенциально вызываемого основным потоком), изменяющего свойство step во время цикла обновления. Я не уверен, что какие-либо операции здесь и в методе get_steps
являются атомарными, или необходим ли мьютекс.
import RPi.GPIO as GPIO
import math
from threading import Thread, Lock
import time
class Encoder:
enc_A = 27
enc_B = 17
switch = 4
def __init__(self, button_callback=None, button_args=None):
GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.enc_A, GPIO.IN)
GPIO.setup(self.enc_B, GPIO.IN)
if button_callback is not None:
GPIO.setup(self.switch, GPIO.IN)
self.button_lock = Lock()
def cb(channel):
with self.button_lock:
return button_callback(channel, button_args) if button_args is not None else button_callback(channel)
GPIO.add_event_detect(self.switch, GPIO.FALLING, callback=cb)
self.lock = Lock()
self.r_seq = self.get_rseq()
self.last_delta = 0
self.reset()
self.threadRun = False
self.start()
def get_rseq(self):
a = GPIO.input(self.enc_A)
b = GPIO.input(self.enc_B)
return (a ^ b) | b << 1
def update(self):
while self.threadRun:
r_seq = self.get_rseq()
delta = 0
if r_seq != self.r_seq:
delta = (r_seq - self.r_seq) % 4
if delta == 3:
delta = -1
elif delta == 2:
delta = int(math.copysign(delta, self.last_delta))
self.last_delta = delta
self.r_seq = r_seq
with self.lock:
self.steps += delta
time.sleep(0.002)
GPIO.remove_event_detect(self.switch)
GPIO.cleanup()
def get_steps(self):
with self.lock:
return self.steps
def reset(self):
with self.lock:
self.steps = 0
def stop(self):
self.threadRun = False
def start(self):
self.threadRun = True
Thread(target=self.update).start()
Я надеюсь сделать этот код как можно более простым и Pythonic, и это включает в себя правильное использование мьютекса. Для моего использования этот код работает, но я очень заинтересован в том, чтобы стремиться к хорошей практике, поэтому я готов принять любой совет в этом случае.
Примечание: обработка кодера основана на py-gaugette - кредит автору.