У меня есть проект на Python, который я хочу протестировать.У меня уже есть модульное тестирование с unittest
, но мне нужно сделать интеграционные тесты.
Для этого у меня есть два приложения: реальное, которое я должен протестировать, и приложение "тестирования", которое будет отправлять запросы.к первому дождитесь ответа, а затем сравните его с ожидаемым результатом:
Таким образом, я смогу проверить, если приложениеправильно отвечает на запросы.
На данный момент у меня есть то, что я описал выше, но в main.py (не конкретный тестовый файл).Кроме того, сравнение только что сделано с функцией печати, так что я вижу, что это работает.Но я должен выполнить эти тесты и иметь возможность получать результаты в обычном формате, таком как junit xml.
Как я могу написать, выполнить и получить результаты этих тестов?
РЕДАКТИРОВАТЬ
Я занимаюсь разработкой пограничного модуля IoT Azure и использую Route для подключения модулей.Вот код модуля тестирования, где мне нужно выполнить тесты:
import random
import time
import sys
import iothub_client
import json
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
# receive_message_callback is invoked when an incoming message arrives on INPUT queue
def receive_message_callback(message, hubManager):
message_buffer = message.get_bytearray()
size = len(message_buffer)
message_text = message_buffer[:size].decode('utf-8')
data = json.loads(message_text)
result = data["result"]
print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result))
class HubManager(object):
def __init__(self, protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on INPUT queue.
self.client.set_message_callback(INPUT, receive_message_callback, self)
# Forwards the message received onto the next stage in the process.
def forward_event_to_output(self, outputQueueName, event, send_context):
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)
def main(protocol):
try:
hub_manager = HubManager(protocol)
# Send request
message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2)
msg_txt_formatted = IoTHubMessage(message)
hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0)
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)