У меня есть метод в моем коде, который отправляет сообщение в sqs. я хочу использовать мото и использовать aws sqs service.
Ниже мой код
def posttosqs(self,url,body):
try:
sqs_cli = boto3.client('sqs')
sqs_cli.send_message(QueueUrl=url, MessageBody=body)
except Exception as e:
raise Exception("Posting failed to SQS")
вот мой тестовый пример
@mock_sqs
@mock_s3
def test_case_use_moto(self):
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='Test')
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
#SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
ctx = context_class_object()
event = {"body": "test"}
resp = lambda.handle_request(event, ctx)
assert resp["statusCode"] == 200
the conn.send_message
работает в тестовом примере, но метод posttosqs
завершается неудачно с
error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.
Мне удалось успешно протестировать операции S3, используя описанный выше метод, но не операцию SQS