Q: прослушиватель событий и автоматическое подтверждение с помощью boto3 - PullRequest
0 голосов
/ 11 марта 2019

В настоящее время я создаю микро-сервис, приложение опроса SQS, которое получает JSON-файл из (в данный момент эластичного mq), запускает его через программу и затем передает результат в другую очередь сообщений.

Код, который я написал прямо сейчас, я думаю, это не лучший практический код, но я не могу определить, как правильно написать слушатель через документы, хотя у меня есть сервис, который использует AWS-SDK в Java (я я новичок в Python), который делает то, что я хочу - слушатель событий, который автоматически подтверждает каждое полученное сообщение.

В настоящее время у меня есть 2 проблемы: я хочу создать слушателя с модулем boto3, который ожидает события, когда событие всплывает, он получает сообщение и автоматически его подтверждает - я хочу прочитать много сообщений, но когда boto3 связывается с очередью, потом не удаляет их.

В настоящее время происходит следующее: у меня есть скрипт, который выполняется бесконечно и выполняет приемный вызов; если приемный вызов НЕ является пустым списком, он очищает очередь от сообщений и запускает ее через нашу программу, а затем распространяет обработанный данные в другую очередь.

Код выглядит так: само приложение.

class App(object):
def __init__(self):
    time.sleep(10)
    self.detector = FaceDetector()
    self.MQHandler = MQHandler()  # Todo - setup
    self.run()

"""
Restart the process
"""
def restart(self):
    self.detector = FaceDetector()
    self.run()

def run(self):
    #Receive from MQ -> stores data in paths variables.
    response = self.MQHandler.receive_from_feeder_service()

    ##Load files
    if len(response) > 0:
        paths = self.translate_JSON_to_list(response[0].body)
        fl = FileLoader(paths).get()
        ##Iterate files and run detection
        results = list(map(lambda f: self.detector.containsFace(f), fl))

        ##Send to MQ
        self.MQHandler.send_to_storage(self.translateToJSON(paths, results))

        ## restart process
        self.restart()
    else:
        self.restart()

очередь сообщений

class MQHandler(object):
def __init__(self):
    self.endpoint = "http://mq:9324"
    self.region = "elasticmq"
    self.access_key = "x"
    self.secret_key = "x"
    self.feeder_queue = "imagePaths"
    self.storage_queue = "storage"

    self.resource = boto3.resource('sqs',
                                   endpoint_url=self.endpoint,
                                   region_name=self.region,
                                   aws_access_key_id=self.access_key,
                                   aws_secret_access_key=self.secret_key,
                                   use_ssl=False)


def receive_from_feeder_service(self):
    queue = self.resource.get_queue_by_name(QueueName=self.feeder_queue)
    receivedMessage = queue.receive_messages(WaitTimeSeconds=20)
    queue.purge()
    return receivedMessage

def send_to_storage(self, message):
    queue = self.resource.get_queue_by_name(QueueName=self.storage_queue)
    queue.send_message(MessageBody=message)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...