Kafka: Consumer api: регрессионный тест не пройден, если выполняется в группе (последовательно) - PullRequest
1 голос
/ 24 января 2020

Я реализовал приложение kafka , используя потребительские API . И у меня есть 2 регрессионных теста, реализованных с помощью потока api :

  1. Чтобы проверить счастливый путь: путем получения данных из теста (во входные данные topi * 1074) * что приложение слушает), которое будет использовано приложением, и приложение будет выдавать данные (в выходные данные topi c), которые тест будет использовать и проверять на соответствие ожидаемым выходным данным.
  2. Чтобы проверить путь ошибки: поведение такое же, как указано выше. Хотя в этот раз приложение будет выводить данные в выходные данные topi c, а тест будет использовать из-за ошибки приложения topi c и будет проверять на наличие ожидаемой ошибки.

Мой код и коды регрессионного теста проживает в том же проекте в ожидаемой структуре каталогов. Оба времени (для обоих тестов) данные должны были быть получены одним и тем же слушателем на стороне приложения.

Проблема:

Когда я выполняя тесты индивидуально (вручную), каждый тест проходит . Однако, если я выполню их вместе, но последовательно (например, чистая сборка), только первый тест будет пройден. 2-й тест завершается неудачно после опроса потребителем на стороне теста, и через некоторое время он перестает находить какие-либо данные.

Наблюдение:

После отладки он Похоже, с первого раза все работает отлично (производители и потребители на стороне тестирования и на стороне приложения). Однако во время 2-го теста кажется, что потребитель на стороне приложения не получает никаких данных (кажется, что производитель на стороне тестирования производит данные, но не может этого сказать наверняка) и, следовательно, нет данных выводится ошибка topi c.

То, что я пробовал до сих пор:

После исследований я понимаю, что мы попадаем в условия гонки и чтобы избежать таких предложений, как:

  • используйте @ DirtiesContext (classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
  • Отрывайте брокера после каждого теста (см. «.destry ()» для брокеров)
  • использовать разные имена topi c для каждого теста

Я применил их все и до сих пор не могу исправить свою проблему.

Я предоставляю здесь код для прочтения. Любое понимание приветствуется.

Код для 1-го теста (путь ошибки тестирования):

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.ERROR_TOPIC
        },

        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }
)
public class AbstractIntegrationFailurePathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;


    //To read from output error
    @Autowired
    protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;


    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

.

   @TestConfiguration
   public  class AdapterStreamFailurePathTestConfig {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.adapter.application-id}")
    private String applicationId;

    @Value("${spring.kafka.adapter.group-id}")
    private String groupId;

    //Producer of records that the program consumes
    @Bean
    public Map<String, Object> sendEmailCmdProducerConfigs() {
        Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
        results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
        return results;
    }

    @Bean
    public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
        return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
    }

    @Bean
    public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
        return new KafkaTemplate<>(inputProducerFactory());
    }


    //Consumer of the error output, generated by the program
    @Bean
    public Map<String, Object> outputErrorConsumerConfig() {
        Map<String, Object> props = KafkaTestUtils.consumerProps(
                applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
        DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
                new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
        return rpf.createConsumer(groupId, "notification-failure");
    }


}

.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {

@Autowired
private DataGenaratorForErrorPath400Test datagen;

@Mock
private AdapterHttpClient httpClient;

@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;


@Before
public void setup() throws InterruptedException {

    Mockito.when(httpClient.callApi(Mockito.any()))
            .thenReturn(
                    new GenericResponse(
                            400,
                            TestConstants.ERROR_MSG_TO_CHK));
    Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();

    inputProducerTemplate.send(
            projectProerties.getInputTopic(),
            datagen.getKey(),
            datagen.getEmailCmdToProduce());
    System.out.println("producer: "+ projectProerties.getInputTopic());

    subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);

}

@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {

    ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
    List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();

    int attempt = 0;
    int expectedRecords = 1;
    do {
        records = KafkaTestUtils.getRecords(outputErrorConsumer);
        records.forEach(outputListOfErrors::add);
        attempt++;
    } while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);

    //Verify the recipient event stream size
    Assert.assertEquals(expectedRecords, outputListOfErrors.size());

    //Validate output

}

@After
public void tearDown() {
    outputErrorConsumer.close();
    embeddedFailurePathKafkaBroker.destroy();
}

}

2-й тест по структуре практически одинаков. Хотя на этот раз потребитель на стороне тестирования потребляет от application-side-output-topi c (вместо ошибки topi c). И я назвал потребителей, брокера, производителя по-разному. Как:

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.OUTPUT_TOPIC
        },
        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }

)
public class AbstractIntegrationSuccessPathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;

    //To read from output regular topic
    @Autowired
    protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;

    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

Пожалуйста, дайте мне знать, если я должен предоставить дополнительную информацию.,

Ответы [ 2 ]

1 голос
/ 24 января 2020

"port = 9092"

Не используйте фиксированный порт; оставьте это, и встроенный брокер будет использовать случайный порт; пользовательские конфиги установлены в KafkaTestUtils для указания на случайный порт.

Вам не нужно портить контекст после каждого метода теста - используйте разные group.id для каждого теста и разные topic.

0 голосов
/ 21 февраля 2020

В моем случае потребитель не был закрыт должным образом. Я должен был сделать:

@After
public void tearDown() {
  // shutdown hook to correctly close the streams application
  Runtime.getRuntime().addShutdownHook(new Thread(ouputConsumer::close));
}

, чтобы решить.

...