Я написал прокси-сервер для перехвата TCP / UDP с использованием Twisted и хочу добавить к нему несколько модульных тестов. Я хочу настроить эхо-протокол, затем отправить некоторые данные через свой прокси-сервер, а затем проверить возвращенный ответ.
Однако, похоже, даже для простого теста с использованием сокета (не говоря уже о моем перехватывающем прокси) для подключения для эха, реактор не кажется порожденным после setUp
- тест зависает навсегда. Если я добавляю тайм-аут к сокету, возникает исключение тайм-аута. Я даже попытался подключиться к ncat
, чтобы убедиться, что виноват не созданный вручную сокет - эхо действительно слушает, но я не получаю эхо-данных обратно клиенту ncat
.
Тестовый код I используйте следующее
import pytest
import socket
from twisted.trial import unittest
from twisted.internet import reactor, protocol
class EchoTCP(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoTCPFactory(protocol.Factory):
protocol = EchoTCP
class TestTCP(unittest.TestCase):
"""Twisted has its own unittest class
https://twistedmatrix.com/documents/15.2.0/core/howto/trial.html
"""
def setUp(self):
self.iface = "127.0.0.1"
self.data = b"Hello, World!"
# Setup twised echoer
self.port = reactor.listenTCP(
8080,
EchoTCPFactory(),
interface=self.iface
)
def tearDown(self):
self.port.stopListening()
def test_echo(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.iface, self.port.getHost().port))
sent = sock.send(self.data)
data = sock.recv(1024)
sock.close()
assert data == self.data
Для его запуска я использую следующую команду
PYTHONPATH="${PWD}" trial --reactor=default mymodule
Результат следующий и остается таким, пока я не убью процесс
mymodule.test.test_network
TestTCP
test_echo ...
Кажется, я что-то упускаю из-за того, как работает реактор. Я искал похожие примеры, но не смог заставить их работать.
Как мне написать тест, чтобы получить ожидаемое поведение?