Как настроить порт в KafkaEmbedded при юнит-тестировании Spring-Kafka-потребителя - PullRequest
2 голосов
/ 20 мая 2019

Я использую spring-boot-starter-parent версию 1.5.0.RELEASE, spring-kafka версию 1.0.0.RELEASE и spring-kafka-test версию 1.0.0.RELEASE в приложении, которое принимает сообщения из кластера Kakfa 0.9.У меня есть модульный тест для моего потребителя, который использовал KafkaEmbedded, но он не проходит, так как порт брокера выбирается случайно.Есть ли способ установить свойство этого брокера без изменения версии?Или какие версии я должен использовать, чтобы ничего не сломать?

Вот код для KafkaListener и KafkaConsumerTest.

Listener.java

@Service
public class Listener {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
        logger.info(msg);
        latch.countDown();
        ack.acknowledge();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

KafkaConsumerTest.java ( РЕДАКТИРОВАТЬ )

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    private static String TEST_TOPIC = "topic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

    public KafkaTemplate<String, String> template;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    private Listener listener;

    @Before
    public void init(){
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
        senderProps.put("key.serializer", StringSerializer.class);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
        template = new KafkaTemplate<>(producerFactory);
        template.setDefaultTopic(TEST_TOPIC);
    }

    @Test
    public void testConsume() throws Exception {
        String record = "message";
        template.sendDefault(TEST_TOPIC, record);
        logger.debug("test-consume sent record {}", record);
        listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
        Assert.assertEquals(listener.getLatch().getCount(), 0);
    }
}

Ответы [ 2 ]

0 голосов
/ 21 мая 2019

Я думаю, что в то время как контекст приложения загружается для тестирования, два компонента создаются типа (ProducerFactory and KafkaTemplate), один с исходными конфигурациями, а второй с тестовыми конфигурациями, попробуйте использовать другой профиль для тестов application-test.yml и добавьте переопределение bean-компонентов свойство

spring.main.allow-bean-definition-overriding to true.

Чтобы он переопределял компоненты приложения с помощью тестовых компонентов, а также объявлял ProducerFactory и KafkaTemplate как компоненты в тесте с тем же именем, что и в приложении

0 голосов
/ 20 мая 2019

Пожалуйста, используйте spring-kafka 1.3.9 с загрузкой 1.5;более ранние версии больше не поддерживаются.Текущая версия boot 1.5.x - 1.5.21.

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

static {
    embeddedKafka.setKafkaPorts(1234);
}

setKafkaPorts доступна с версии 1.3.

Однако вы правильно используете выделенный случайный порт в своем тесте

Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

Чтобы подключить слушателя kafka к встроенному брокеру, вы можете использовать.

    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); 
...