Получить данные из темы после нажатия на @EmbeddedKafka в весенней загрузке Junit - PullRequest
0 голосов
/ 31 января 2019

Я пишу тестовые примеры Junit (используя @EmbeddedKafka) для моего приложения Spring Boot, которое широко использует Spring-kafka для связи с другими сервисами и для других операций.

Один типичный случай - удаление данных из kafka.(что мы делаем с отправкой сообщения null в kafka).

В настоящее время в методе delete () мы делаем это, сначала проверяя, существует ли какое-либо сообщение в kafka, которое запрашивается какудален.Затем мы нажимаем null для этого ключа сообщения в Kafka

Шаги, сопровождаемые написанием Junit для вышеуказанной логики метода.

@Test
public void test(){
   //Push a message to Kafka (id=1234)
   //call test method service.delete(1234);
       //internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
  //check delete topic for delete message received.
  // Assertions
}

Проблема здесь в том, что Кафка всегда выдает исключение не найденное сообщение.внутри метода service.delete ().

при проверке журналов в консоли.я выяснил, что мой производитель-конфиг использует другой порт для kafka, а потребительский конфиг использует другой порт.

Я не уверен, пропустил ли я какие-то мелкие детали или в чем причина такого поведения.Любая помощь будет оценена.

1 Ответ

0 голосов
/ 31 января 2019

У меня есть это простое приложение Spring Boot для рассмотрения:

@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {

    public static final String MY_TOPIC = "myTopic";

    public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
    }

    @KafkaListener(topics = MY_TOPIC)
    public void listener(String payload) {
        this.kafkaMessages.add(payload);
    }

}

application.properties:

spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest

И тест:

@RunWith(SpringRunner.class)
@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {

    @Autowired
    private KafkaTemplate<Object, String> kafkaTemplate;

    @Autowired
    private SpringBootEmbeddedKafkaApplication kafkaApplication;

    @Test
    public void testListenerWithEmbeddedKafka() throws InterruptedException {
        String testMessage = "foo";
        this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);

        assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
    }

}

Обратите внимание на spring.kafka.consumer.auto-offset-reset=earliest, позволяющий потребителю читать с начала раздела.

Еще одна важная опция, которую можно применить в тесте:

@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")

@EmbeddedKafkaзаполняет системное свойство spring.embedded.kafka.brokers и делает автоконфигурацию Spring Boot о том, что нам нужно скопировать его значение в свойство конфигурации spring.kafka.bootstrapServers.

или другой вариант в соответствии с нашими документами :

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
...