весеннее интеграционное тестирование Kafka со встроенным Kafka - PullRequest
0 голосов
/ 18 февраля 2019

У меня есть приложение весенней загрузки, которое потребляет потребителя из темы в одном кластере и производит в другую тему в другом кластере.

Теперь я пытаюсь написать интеграционный тестовый пример с использованием встроенного Spring Kafka, но у меня естьвыпуск KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

класс потребителей

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);

   }

}

класс производителей

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }

В моих тестовых случаях я хочупереопределить KafkaTemplate, чтобы при вызове метода kafkaConsumerService.professor в Test он выводил данные во встроенный Kafka и проверял их.

Test config

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

 }

Тестовый класс

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
     kafkaConsumerService.professor(Arrays.asList(new Professor()));

     //How to check messages is sent to kafka?
}

 }

Ошибка

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

А также кто-нибудь может мне помочь, какпроверять сообщения, отправленные на встроенный сервер Kafka?

Примечание У меня есть некоторые устаревшие предупреждения

Тип KafkaEmbedded устарел

МетодgetPartitionsPerTopic () из типа KafkaEmbedded устарела

Метод generatorProps (Kaf)kaEmbedded) типа KafkaTestUtils устарела

1 Ответ

0 голосов
/ 19 февраля 2019

Boot 2.1 по умолчанию отключает переопределение bean .

Переопределение bean по умолчанию отключено, чтобы предотвратить случайное переопределение bean.Если вы полагаетесь на переопределение, вам нужно установить spring.main.allow-bean-definition-overriding на true.

Относительно амортизации;см. Javadocs для @EmbeddedKafka.Заменяется на EmbeddedKafkaBroker.

...