Я новичок в фреймворке Pytest. В настоящее время у меня есть приложение Python, которое использует и генерирует сообщения, используя Kafka- Python. Я пытаюсь написать интеграционные тесты, которые бы проверяли, были ли они получены и произведены из / в топи c. В настоящее время я использую pytest-docker
для ускорения контейнера Kafka с производителем и потребителем topi c. Я сталкиваюсь с периодически возникающими ошибками "NoBrokersAvailable, нераспознанная версия брокера" Я не уверен, что я делаю неправильно. Любая помощь будет принята с благодарностью.
docker -compose.yml:
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
create-consumer-topic:
image: confluentinc/cp-kafka:5.3.0
depends_on:
- kafka1
command: |
bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:9092 1 20 && \
kafka-topics --create --topic test-consumer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
sleep infinity'
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
create-producer-topic:
image: confluentinc/cp-kafka:5.3.0
depends_on:
- kafka1
command: |
bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:9092 1 20 && \
kafka-topics --create --topic test-producer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
kafka-console-producer
sleep infinity'
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
conftest.yml:
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml")
@pytest.fixture(scope="session")
def producer(docker_services):
producer = Producer(config=KAFKA_PRODUCER_CONFIG, key_serializer=str.encode,
value_serializer=str.encode, alerter=None)
sleep(4)
return producer
test.py:
def test__producer_sends_to_topic(producer):
producer.open()
producer.send(Payload(key="hello", value="world"))
assert <code to assert>
Производитель:
class Producer:
"""
Wrapper for kafka producer.
This wrapper handles creating, closing and sending messages to topic.
"""
def __init__(self, config: Dict[str, str], key_serializer, value_serializer, alerter=None):
self.alerter = alerter
self.config = config
self.retries = config['retries']
self.topic = config['topic']
self.seconds_between_tries = config['seconds_between_tries']
self.producer = None
self.key_serializer = key_serializer
self.value_serializer = value_serializer
def _get_context(self) -> SSLContext:
context = create_default_context(
purpose=Purpose.CLIENT_AUTH,
cafile=self.config['ca_file'])
context.load_cert_chain(
self.config['cert_file'],
password=self.config['password'])
return context
def open(self) -> KafkaProducer:
""" Create the underlying KafkaProducer """
if self.config.get('ssl_enabled', True):
ssl_context = self._get_context()
else:
ssl_context = None
self.producer = KafkaProducer(
ssl_context=ssl_context,
acks=self.config['acks'],
client_id=self.config['client_id'],
bootstrap_servers=self.config['bootstrap_servers'],
security_protocol=self.config['security_protocol'],
ssl_check_hostname=self.config['ssl_check_hostname'],
key_serializer=self.key_serializer,
value_serializer=self.value_serializer,
request_timeout_ms=self.config['request_timeout_ms'],
max_block_ms=self.config['max_block_ms'],
batch_size=self.config['batch_size'],
linger_ms=self.config['linger_ms'],
buffer_memory=self.config['buffer_memory']
)