Как переходы Python для роботизированных приложений / встроенных систем - PullRequest
0 голосов
/ 26 октября 2018

Я смотрю на использование переходов для создания конечных автоматов для роботизированных приложений. Ранее я использовал Simulink Stateflow для такого рода задач. Здесь конечный автомат может вызываться периодически, и текущее активное состояние оценивается, чтобы увидеть, может ли быть выполнен выходной переход в новое состояние, в противном случае текущее состояние сохраняется (один переход может быть выполнен за опрос). С этим мышлением я пытаюсь создать первоначальный игрушечный пример с переходами, чтобы увидеть, смогу ли я справиться с ним, но у меня возникает ощущение, что мне нужно думать об этом решении по-другому.

Примером игрушки, о которой я подумал, является один из тех автоматических ворот для паспорта аэропорта, где камера переводится в Z, чтобы соответствовать высоте лица пользователя. Вот моя (надеюсь, самоописанная) попытка:

from transitions import Machine
from time import sleep, time
from random import choice

class Passport_cam_sm(object):

    def __init__(self):

        self.face_ok = False
        self.height_ok = False

states = ['move2face', 'validate_face', 'open_gate']

transitions = [
    {'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face', 'conditions': 'height_ok'},
    {'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 'conditions': 'face_valid'},
    {'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]

class Passport_cam(object):

    def __init__(self, terminate_time_s=10, time_step_s=1):

        self.model = Passport_cam_sm()
        self.machine = Machine(model=self.model, states=states, transitions=transitions, initial='move2face', send_event=True)
        self.running = True
        self.init_time = time()
        self.terminate_time_s = terminate_time_s
        self.time_step_s = time_step_s
        self.face_pos_z = 5
        self.camera_pos_z = 0
        self.error_z = 888

    def run(self):
        '''
        main program loop
        :return: 
        '''

        while self.running and not self.main_timeout():

            self.machine.face_ok = self.face_ok()
            self.machine.height_ok = self.height_ok()
            print ('At state ' + self.model.state) #, ' camera height (', self.error_z ,') ok is ', self.machine.height_ok, ' face check is ', self.machine.face_ok)
            # todo - poll latest state here? (self.model.state)
            self.camera_dynamics()
            sleep(1)

    def face_ok(self):
        '''
        very robust method for determining if the face is valid...
        :return: 
        '''
        return choice([True, False])

    def height_ok(self, tol=0.5):
        '''
        Checks if the face height is OK to do comparison.
        :return: 
        '''
        if abs(self.error_z) < tol:
            return True
        else:
            return False

    def camera_dynamics(self, max_displacement=1):
        '''
        Moves camera height towards face height at a maximum of "max_displacement" per function call 
        :return: 
        '''
        self.error_z = self.camera_pos_z - self.face_pos_z
        threshold_error = (min(max(self.error_z, -max_displacement), max_displacement))
        self.camera_pos_z = self.camera_pos_z - threshold_error
        print ('Camera height error is: {0}'.format(self.error_z))

    def main_timeout(self):
        if time() > self.init_time + self.terminate_time_s:
            return True
        else:
            return False

pc = Passport_cam()
pc.run()

То, что я ожидал найти, это какой-то способ опроса последнего состояния в части 'todo' в коде, чтобы проверить, действительны ли сейчас какие-либо условия выхода. Есть ли способ сделать это? Повторный вход в текущее состояние должен быть в порядке, но я думаю, что какой-то метод "в течение" был бы идеальным.

Иначе есть ли лучший способ структурировать проект такого рода и / или какие-нибудь примеры проектов, подобные этому?

1 Ответ

0 голосов
/ 29 октября 2018

Добро пожаловать в переполнение стека. Я бы предпочел «опросить» переход и подождать, пока он не будет успешно выполнен. Переход вернет True, если он был выполнен успешно, и False, если а) подготовка не удалась, б) условия не были выполнены или в) что-то не удалось во время входа в состояние или г) проблемы, возникающие при обработке after События. Я сузил ваш пример кода, чтобы проиллюстрировать, как его можно использовать для проверки того, были ли face и height успешно получены:

from transitions import Machine
from time import sleep
from random import choice

class Passport_cam_sm(object):

    def __init__(self):
        self._face_okay = False
        self._height_okay = False

    def face_ok(self, even_data):
        self._face_okay = choice([True, False])
        return self._face_okay

    def height_ok(self, even_data):
        # tol = even_data.kwargs.pop('tol', 0.5)
        self._height_okay =  choice([True, False])
        return self._height_okay

states = ['move2face', 'validate_face', 'open_gate']

transitions = [
    {'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face',
     'conditions': ['height_ok', 'face_ok']},
    {'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 
     'conditions': 'face_valid'},
    {'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]


class Passport_cam(object):

    def __init__(self):

        self.model = Passport_cam_sm()
        self.machine = Machine(model=self.model, states=states, transitions=transitions,
                               initial='move2face', send_event=True)
        self.running = True

    def run(self):
        while self.running:
            print('At state ' + self.model.state)
            # model.at_face will only return True when both conditions are met
            while not self.model.at_face():
                print('Checking ...') 
                sleep(1)
            print('Face and height are okay!')
            self.camera_dynamics()
            sleep(1)

    def camera_dynamics(self):
        print("Processing...")
        self.running = False


pc = Passport_cam()
pc.run()
print('Done')

Я добавил обе проверки (face / height_ok) в качестве условий для допустимого перехода. Если вы хотите назначить их первыми и проверить только их значения в conditions, вы можете использовать ключевое слово переходов prepare. Функции / методы в режиме подготовки будут выполняться до conditions и не требуют логического возвращаемого значения. Когда вы указываете send_event=True, все обратные вызовы должны ожидать этого события. Вот почему face/height_ok требует подписи, использованной выше. Аргументы, передаваемые триггерному событию (например, model.at_face(tol=0.5)), будут присвоены event_data.args или event_data.kwargs.

Обратите внимание, что я назначил проверку ваших условий модели. Строки всегда предполагаются как имена методов модели. Если вы хотите назначать функции / методы откуда-то еще, вы можете передавать ссылки на эти функции вместо строк. Также обратите внимание, что это будет работать только тогда, когда события обрабатываются мгновенно. transitions поддерживает обработку событий в очереди (передача queued=True в конструктор Machine), что удобно, когда события могут инициировать другие события. Когда queued=True события будут ВСЕГДА возвращать истину.

Как можно проводить опрос агностиком?

Главной особенностью конечного автомата является способность адаптировать его поведение в зависимости от его текущего состояния. Одно и то же событие может привести к разным результатам. Если вы хотите, чтобы ваша машина постоянно опрашивала, а не реагировала на события, вы можете определить все переходы, которые будут запускаться одним и тем же событием:

  transitions = [ 
      {'trigger': 'check', 'source': 'move2face', 'dest': 'validate_face',
     'conditions': ['height_ok', 'face_ok']},
      {'trigger': 'check', 'source': 'validate_face', 'dest': 'open_gate', 
     'conditions': 'face_valid'}, # (1)
      {'trigger': 'check', 'source': 'validate_face', 'dest': 'move2face'}, # (2)
  ]
  ...
  # class Passport_cam
  def run(self):
        while self.running:
            print('At state ' + self.model.state)
            while not self.model.check():
                sleep(1)

Цикл можно упростить до простого вызова model.check. Таким образом, переходы, проверки и состояния могут быть введены без необходимости изменения цикла опроса.

Переходы выполняются в порядке их добавления. Это означает, что (1) и (2) образуют правило In state 'validate_face' go to 'open_gate' if 'face_valid', otherwise go to 'move2face'. Переходы с исходным состоянием, отличным от текущего состояния, не должны проверяться и не приводят к значительным накладным расходам.

Что наиболее важно, так это общий дизайн состояний и переходов машины, который не зависит от структуры. Если проверки условий раздуты, рассмотрите возможность разделения состояний на более мелкие функционально более конкретные состояния. Вы также можете разделить конфигурации перехода и / или составить свою модель из нескольких специализированных моделей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...