Pepper NAOqi 2.5: Как создать функцию обратного вызова из события? - PullRequest
1 голос
/ 27 мая 2020

Я работаю над Pepper из SoftBank Robotics, и я пытаюсь реализовать программу, которая может интегрировать функцию обратного вызова, когда событие «Navigation / AvoidanceNavigator / ObstacleDetected» становится ИСТИННЫМ. , Я попытался обратиться к руководству, но в разделе Reacting to Events SDK Python есть только пример, использующий событие FaceDetect.

К сожалению, в отличие от события FaceDetect, который является частью службы ALFaceDetection, имеющей метод подписки, событие «Navigation / AvoidanceNavigator / ObstacleDetected» связано с API ALNavigation, у которого нет подписчика.

Кто-нибудь может мне с этим помочь?

Вот моя первая (не рабочая) идея:

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-


from naoqi import ALProxy
import qi
import argparse
import sys
import numpy
from PIL import Image
import os
import time


class tryGetEvent(object):


    def __init__(self, app):
        super(tryGetEvent, self).__init__()
        app.start()                 #start the application (see qi.ApplicationAPI)
        session=app.session         #return the current session

        # Open the service ALMemory

        self.memory=session.service("ALMemory")

        #Connect to the event callback
        self.subscriber=self.memory.subscriber("Navigation/AvoidanceNavigator/ObstacleDetected")
        self.subscriber.signal.connect(self.obstacleDet)

        self.tts=session.service("ALTextToSpeech")
        self.navigation=session.service("ALNavigation")
        self.navigation.subscribe("tryGetEvent")
        self.got_obst=False

    def obstacleDet(self, position):

        if position==[]: #empty value
            self.got_obst=False
        elif not self.got_obst:
            self.got_obst=True
            print "Obstacle detected!"
            self.tts.say("Obstacle detected!")

            pos_plot=position [1]

            print (pos_plot)

    def run(self):

        print("Starting script tryGetEvent")
        try:
            while True:
                time.sleep(0.2)
        except KeyboardInterrupt:
            print("user interrupted the script")
            self.navigation.unsubscribe("tryGetEvent")
            sys.exit(0)



# DO NOT TOUCH !

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="192.168.1.32",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["tryGetEvent", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)


    Get_Event=tryGetEvent(app)
    Get_Event.run()

1 Ответ

0 голосов
/ 02 июля 2020

Мне жаль, что пример в руководстве неясен, но есть более простой способ написать этот код.

obstacle_tracker.py:

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
import qi

class ObstacleTracker:
    def __init__(self, session):
        self.session = session
        self.got_obst = False
        memory = session.service("ALMemory")
        subscriber = memory.subscriber("Navigation/AvoidanceNavigator/ObstacleDetected")
        subscriber.signal.connect(self.obstacleDet)

    def obstacleDet(self, position):
        if position == []: #empty value
            self.got_obst = False
        elif not self.got_obst:
            self.got_obst = True
            print("Obstacle detected!")
            tts = self.session.service("ALTextToSpeech")
            tts.say("Obstacle detected!")
            print(position)


if __name__ == "__main__":
    app = qi.Application()
    app.start()
    obstacle_tracker = ObstacleTracker(app.session)
    app.run()

Запустите его с помощью: $ python obstacle_tracker.py --qi-url=192.168.1.32

...