Pytest - потребитель и производитель KAFKA (интеграционный тест) - PullRequest
0 голосов
/ 05 февраля 2020

Я новичок в фреймворке Pytest. В настоящее время у меня есть приложение Python, которое использует и генерирует сообщения, используя Kafka- Python. Я пытаюсь написать интеграционные тесты, которые бы проверяли, были ли они получены и произведены из / в топи c. В настоящее время я использую pytest-docker для ускорения контейнера Kafka с производителем и потребителем topi c. Я сталкиваюсь с периодически возникающими ошибками "NoBrokersAvailable, нераспознанная версия брокера" Я не уверен, что я делаю неправильно. Любая помощь будет принята с благодарностью.

docker -compose.yml:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  create-consumer-topic:
    image: confluentinc/cp-kafka:5.3.0
    depends_on:
      - kafka1
    command: |
      bash -c 'echo Waiting for Kafka to be ready... && \
      cub kafka-ready -b kafka1:9092 1 20 && \
      kafka-topics --create --topic test-consumer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
      sleep infinity'
    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored

  create-producer-topic:
    image: confluentinc/cp-kafka:5.3.0
    depends_on:
      - kafka1
    command: |
      bash -c 'echo Waiting for Kafka to be ready... && \
      cub kafka-ready -b kafka1:9092 1 20 && \
      kafka-topics --create --topic test-producer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
      kafka-console-producer
      sleep infinity'
    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored

conftest.yml:

@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
    return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml")


@pytest.fixture(scope="session")
def producer(docker_services):
    producer = Producer(config=KAFKA_PRODUCER_CONFIG, key_serializer=str.encode,
                        value_serializer=str.encode, alerter=None)
    sleep(4)
    return producer

test.py:

def test__producer_sends_to_topic(producer):
    producer.open()
    producer.send(Payload(key="hello", value="world"))
    assert <code to assert>

Производитель:

class Producer:
    """
    Wrapper for kafka producer.
    This wrapper handles creating, closing and sending messages to topic.
    """

    def __init__(self, config: Dict[str, str], key_serializer, value_serializer, alerter=None):
        self.alerter = alerter
        self.config = config
        self.retries = config['retries']
        self.topic = config['topic']
        self.seconds_between_tries = config['seconds_between_tries']
        self.producer = None
        self.key_serializer = key_serializer
        self.value_serializer = value_serializer

    def _get_context(self) -> SSLContext:
        context = create_default_context(
            purpose=Purpose.CLIENT_AUTH,
            cafile=self.config['ca_file'])
        context.load_cert_chain(
            self.config['cert_file'],
            password=self.config['password'])
        return context

    def open(self) -> KafkaProducer:
        """ Create the underlying KafkaProducer """
        if self.config.get('ssl_enabled', True):
            ssl_context = self._get_context()
        else:
            ssl_context = None

        self.producer = KafkaProducer(
            ssl_context=ssl_context,
            acks=self.config['acks'],
            client_id=self.config['client_id'],
            bootstrap_servers=self.config['bootstrap_servers'],
            security_protocol=self.config['security_protocol'],
            ssl_check_hostname=self.config['ssl_check_hostname'],
            key_serializer=self.key_serializer,
            value_serializer=self.value_serializer,
            request_timeout_ms=self.config['request_timeout_ms'],
            max_block_ms=self.config['max_block_ms'],
            batch_size=self.config['batch_size'],
            linger_ms=self.config['linger_ms'],
            buffer_memory=self.config['buffer_memory']
        )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...