В настоящее время я создаю микро-сервис, приложение опроса SQS, которое получает JSON-файл из (в данный момент эластичного mq), запускает его через программу и затем передает результат в другую очередь сообщений.
Код, который я написал прямо сейчас, я думаю, это не лучший практический код, но я не могу определить, как правильно написать слушатель через документы, хотя у меня есть сервис, который использует AWS-SDK в Java (я я новичок в Python), который делает то, что я хочу - слушатель событий, который автоматически подтверждает каждое полученное сообщение.
В настоящее время у меня есть 2 проблемы: я хочу создать слушателя с модулем boto3, который ожидает события, когда событие всплывает, он получает сообщение и автоматически его подтверждает - я хочу прочитать много сообщений, но когда boto3 связывается с очередью, потом не удаляет их.
В настоящее время происходит следующее: у меня есть скрипт, который выполняется бесконечно и выполняет приемный вызов; если приемный вызов НЕ является пустым списком, он очищает очередь от сообщений и запускает ее через нашу программу, а затем распространяет обработанный данные в другую очередь.
Код выглядит так: само приложение.
class App(object):
def __init__(self):
time.sleep(10)
self.detector = FaceDetector()
self.MQHandler = MQHandler() # Todo - setup
self.run()
"""
Restart the process
"""
def restart(self):
self.detector = FaceDetector()
self.run()
def run(self):
#Receive from MQ -> stores data in paths variables.
response = self.MQHandler.receive_from_feeder_service()
##Load files
if len(response) > 0:
paths = self.translate_JSON_to_list(response[0].body)
fl = FileLoader(paths).get()
##Iterate files and run detection
results = list(map(lambda f: self.detector.containsFace(f), fl))
##Send to MQ
self.MQHandler.send_to_storage(self.translateToJSON(paths, results))
## restart process
self.restart()
else:
self.restart()
очередь сообщений
class MQHandler(object):
def __init__(self):
self.endpoint = "http://mq:9324"
self.region = "elasticmq"
self.access_key = "x"
self.secret_key = "x"
self.feeder_queue = "imagePaths"
self.storage_queue = "storage"
self.resource = boto3.resource('sqs',
endpoint_url=self.endpoint,
region_name=self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
use_ssl=False)
def receive_from_feeder_service(self):
queue = self.resource.get_queue_by_name(QueueName=self.feeder_queue)
receivedMessage = queue.receive_messages(WaitTimeSeconds=20)
queue.purge()
return receivedMessage
def send_to_storage(self, message):
queue = self.resource.get_queue_by_name(QueueName=self.storage_queue)
queue.send_message(MessageBody=message)