Как сделать функциональные тесты для Kafka Streams с Avro (schemaRegistry)? - PullRequest
0 голосов
/ 10 октября 2018
  • Краткое объяснение того, чего я хочу достичь: я хочу сделать функциональные тесты для топологии потока Кафки (используя TopologyTestDriver) для записей avro.

  • Проблемы: Не удается «смоделировать» schemaRegistry для автоматизации публикации / чтения схемы

До сих пор я пытался использовать MockSchemaRegistryClient, чтобы попытаться смоделировать schemaRegistry, но я не знаю, какчтобы связать его с Avro Serde.

Код

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}

Метод AvroSerde

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}

Если я запускаю тест без testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));, он работает хорошо (выглядит каквсе правильно установлено)

Но

Когда я пытаюсь вставить данные ( pipeInput ), выдается следующее исключение: Объект «Отслеживание» заполнен полностью.

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

Отредактировано, я не удалил это, для "журнала истории", чтобы указать путь.

Ответы [ 4 ]

0 голосов
/ 17 июня 2019

Для этого я создал небольшую тестовую библиотеку на основе testcontainers: https://github.com/vspiliop/embedded-kafka-cluster. Запускает полностью настраиваемый кластер Kafka на основе докера (брокер, zookeeper и Confluent Schema Registry) как часть ваших тестов.Посмотрите на пример тестовых модулей и огурцов.Основное отличие от других решений, основанных на Docker, заключается в том, что файл составления Docker «генерируется» с помощью параметров аннотации @EmbeddedKafkaCluster и не жестко закодирован.

Например, вы можете использовать аннотацию @EmbeddedKafkaCluster следующим образом:

@ContextConfiguration()
@EmbeddedKafkaCluster(topics = {"test.t"}, brokersCount = 1, zookeepersCount = 1, schemaRegistriesCount = 1)
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
public class FeatureSteps {
0 голосов
/ 10 октября 2018

Отказ от ответственности: я не проверял это.Я просто делюсь некоторыми идеями о том, как вы могли бы заставить это работать.Надеюсь это поможет.Если вы можете оставить отзыв об этом ответе, было бы здорово найти правильное и рабочее решение.

Я не думаю, что вы можете использовать обычный Avro Serde через config:

props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());

Насколько я понимаю, он попытается подключиться к

props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

Однако при использовании MockSchemaRegistryClient нет конечной точки http для подключения.Вместо этого вам нужно передать фиктивный клиент в Serde при его создании:

MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
// add the schemas you want to use
schemaRegistryClient.register(...);
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);

Таким образом, вы просто настраиваете «фиктивную» конечную точку http, потому что предоставляющий имитирующий клиент все равно не будет ее использовать.

Передача соответствующего Serde с помощью кода, подобного приведенному здесь, кажется правильной:

StreamBuilder.stream("topic", Consumed.with(Serdes.String(), avroSerde));
0 голосов
/ 10 октября 2018

Подход, который сработал для нас лучше всего, это java-тестовые контейнеры с слитными образами докеров платформы.Вы можете настроить следующий файл составления Docker:

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:5.0.0
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
    ports:
      - 8081:8081
    depends_on:
      - zookeeper
      - kafka

Единственное, что вам нужно сделать, это добавить 127.0.0.1 kafka к /etc/hosts.При таком подходе у вас есть практически весь кластер, готовый к тестированию интеграции.Кластер будет уничтожен после завершения интеграционного теста.

РЕДАКТИРОВАТЬ:

Лучше docker-compose без фактического изменения /etc/hosts

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:5.0.0
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
    extra_hosts:
      - "moby:127.0.0.1"

Kafkaбудет доступно на локальном хосте: 9092

0 голосов
/ 10 октября 2018

Confluent предоставляет множество примеров кода для тестирования Kafka (Streams) вместе с реестром схемы.

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Самое главное, что имитация не является полным интеграционным тестом - запускФактический брокер Kafka с реестром схемы памяти.

В приведенном выше коде см.

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

И

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
...