Я реализовал приложение kafka , используя потребительские API . И у меня есть 2 регрессионных теста, реализованных с помощью потока api :
- Чтобы проверить счастливый путь: путем получения данных из теста (во входные данные topi * 1074) * что приложение слушает), которое будет использовано приложением, и приложение будет выдавать данные (в выходные данные topi c), которые тест будет использовать и проверять на соответствие ожидаемым выходным данным.
- Чтобы проверить путь ошибки: поведение такое же, как указано выше. Хотя в этот раз приложение будет выводить данные в выходные данные topi c, а тест будет использовать из-за ошибки приложения topi c и будет проверять на наличие ожидаемой ошибки.
Мой код и коды регрессионного теста проживает в том же проекте в ожидаемой структуре каталогов. Оба времени (для обоих тестов) данные должны были быть получены одним и тем же слушателем на стороне приложения.
Проблема:
Когда я выполняя тесты индивидуально (вручную), каждый тест проходит . Однако, если я выполню их вместе, но последовательно (например, чистая сборка), только первый тест будет пройден. 2-й тест завершается неудачно после опроса потребителем на стороне теста, и через некоторое время он перестает находить какие-либо данные.
Наблюдение:
После отладки он Похоже, с первого раза все работает отлично (производители и потребители на стороне тестирования и на стороне приложения). Однако во время 2-го теста кажется, что потребитель на стороне приложения не получает никаких данных (кажется, что производитель на стороне тестирования производит данные, но не может этого сказать наверняка) и, следовательно, нет данных выводится ошибка topi c.
То, что я пробовал до сих пор:
После исследований я понимаю, что мы попадаем в условия гонки и чтобы избежать таких предложений, как:
- используйте @ DirtiesContext (classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
- Отрывайте брокера после каждого теста (см. «.destry ()» для брокеров)
- использовать разные имена topi c для каждого теста
Я применил их все и до сих пор не могу исправить свою проблему.
Я предоставляю здесь код для прочтения. Любое понимание приветствуется.
Код для 1-го теста (путь ошибки тестирования):
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.ERROR_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationFailurePathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;
//To read from output error
@Autowired
protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
.
@TestConfiguration
public class AdapterStreamFailurePathTestConfig {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.kafka.adapter.application-id}")
private String applicationId;
@Value("${spring.kafka.adapter.group-id}")
private String groupId;
//Producer of records that the program consumes
@Bean
public Map<String, Object> sendEmailCmdProducerConfigs() {
Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
return results;
}
@Bean
public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
}
@Bean
public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
return new KafkaTemplate<>(inputProducerFactory());
}
//Consumer of the error output, generated by the program
@Bean
public Map<String, Object> outputErrorConsumerConfig() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
return rpf.createConsumer(groupId, "notification-failure");
}
}
.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {
@Autowired
private DataGenaratorForErrorPath400Test datagen;
@Mock
private AdapterHttpClient httpClient;
@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;
@Before
public void setup() throws InterruptedException {
Mockito.when(httpClient.callApi(Mockito.any()))
.thenReturn(
new GenericResponse(
400,
TestConstants.ERROR_MSG_TO_CHK));
Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();
inputProducerTemplate.send(
projectProerties.getInputTopic(),
datagen.getKey(),
datagen.getEmailCmdToProduce());
System.out.println("producer: "+ projectProerties.getInputTopic());
subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);
}
@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {
ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();
int attempt = 0;
int expectedRecords = 1;
do {
records = KafkaTestUtils.getRecords(outputErrorConsumer);
records.forEach(outputListOfErrors::add);
attempt++;
} while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);
//Verify the recipient event stream size
Assert.assertEquals(expectedRecords, outputListOfErrors.size());
//Validate output
}
@After
public void tearDown() {
outputErrorConsumer.close();
embeddedFailurePathKafkaBroker.destroy();
}
}
2-й тест по структуре практически одинаков. Хотя на этот раз потребитель на стороне тестирования потребляет от application-side-output-topi c (вместо ошибки topi c). И я назвал потребителей, брокера, производителя по-разному. Как:
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.OUTPUT_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationSuccessPathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;
//To read from output regular topic
@Autowired
protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
Пожалуйста, дайте мне знать, если я должен предоставить дополнительную информацию.,