Моя цель с этими кодами ниже - подключиться к устройству через TCP / IP-соединение и получить некоторые данные с этого устройства. Я хотел бы разработать с TDD. Поэтому я также пишу тестовые классы для этого класса, используя модуль unittest.
Класс устройства
class Device():
def __init__(self, dev=None, **kwargs):
if device:
self.ip = dev.ip
self.port = dev.port
elif 'ip' in kwargs and 'port' in kwargs:
self.ip = kwargs['ip']
self.port = kwargs['port']
else:
raise ValueError("Dev object or ip and port must be provided!")
def is_active(self):
"""Check if device is active"""
with socket.socket() as s:
try:
s.connect((self.ip, self.port))
s.sendall(b'temperature')
return True
except Exception:
return False
def get(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.ip, self.port))
print("connected")
s.sendall(b'Hello, world')
data = s.recv(1024)
return self.data
Класс теста
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
def sample_device_model(ip='127.0.0.1', port=3306):
return Dev.objects.create(
ip=ip,
port=port,
name='testname'
)
def run_fake_server():
HOST = '127.0.0.1' # Standard loopback interface address (localhost)
PORT = 3306 # Port to listen on (non-privileged ports are > 1023)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
log = logging.getLogger("TestLog")
log.debug('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
class TestDevice(TestCase):
def setUp(self):
self.server_thread = threading.Thread(target=run_fake_server, daemon=True)
self.server_thread.start()
def tearDown(self):
self.server_thread.join()
def test_is_active_property_working_server(self):
"""Test if is_active function working properly
while server running"""
ip = '127.0.0.6'
port = 3306
device = Device(
ip=ip,
port=port
)
self.assertTrue(device.is_active())
def test_is_active_property_not_working_server(self):
"""Test is is_active function working properly
while server not running"""
ip = '127.0.0.5'
port = 3306
device = Device(ip=ip, port=port)
self.assertFalse(device.is_active())
def test_is_active_property_working_server_using_device(self):
"""Test if is_active function working properly
while server running"""
device = sample_device_model()
device = Device(
dev=dev
)
self.assertTrue(device.is_active())
def test_is_active_property_not_working_server_using_device(self):
"""Test is is_active function working properly
while server not running"""
dev = sample_device_model(ip='127.0.0.5')
device = Device(
dev=dev
)
self.assertFalse(miner.is_active())
Когда я запускаю этот тест, он начинается, а затем просто ждет. Вывод на консоль следующий:
Creating test database for alias 'default'...
DEBUG:asyncio:Using selector: EpollSelector
System check identified no issues (0 silenced).
Скорее всего, фальшивый сервер не принимает соединения. Это из-за шага? Как я могу решить эту проблему?
Есть ли другой способ выполнить модульное тестирование TCP-клиента?
Спасибо,