moto for python - запись в sqs не удалась - PullRequest
0 голосов
/ 25 февраля 2020

У меня есть метод в моем коде, который отправляет сообщение в sqs. я хочу использовать мото и использовать aws sqs service.

Ниже мой код

 def posttosqs(self,url,body):
    try:

        sqs_cli = boto3.client('sqs')
        sqs_cli.send_message(QueueUrl=url, MessageBody=body)
    except Exception as e:

        raise Exception("Posting failed to SQS")

вот мой тестовый пример

    @mock_sqs
    @mock_s3
    def test_case_use_moto(self):

        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='Test')
        conn = boto3.client('sqs', region_name='us-east-1')
        queue = conn.create_queue(QueueName='Test')

        os.environ["SQS_URL"] = queue["QueueUrl"]
        conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
        #SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
        ctx = context_class_object()
        event = {"body": "test"}
        resp = lambda.handle_request(event, ctx)
        assert resp["statusCode"] == 200

the conn.send_message работает в тестовом примере, но метод posttosqs завершается неудачно с

error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.

Мне удалось успешно протестировать операции S3, используя описанный выше метод, но не операцию SQS

...