Как выполнить тестирование Spring Cloud Stream с использованием Kafka DSL - PullRequest
0 голосов
/ 31 мая 2018

Я пытаюсь (юнит) протестировать процессор Spring Cloud Stream Kafka, который использует Kafka DSL, но получает следующую ошибку "Connection to node -1 could not be established. Broker may not be available.".Кроме того, тест не выключается.Я пробовал и EmbeddedKafka, и TestBinder, и все же у меня такое же поведение.Я попытался начать с ответа , предоставленного Spring Cloud Team (который работает), и я адаптировал приложение для использования Kafka DSL и оставил тестовый класс почти как есть.Действительно ли EmbeddedKafka поддерживает DSL Kafka?

Я использую Elmhurst.RELEASE

@SpringBootApplication
@EnableBinding(MyBinding.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener
    @SendTo(MyBinding.OUTPUT)
    public KStream<String,String> process(@Input(MyBinding.INPUT) KStream<String, String> in) {

        return in.peek((k,v) -> System.out.println("Received value " +v ))
                .mapValues(v -> v.toUpperCase());
    }
}

interface MyBinding {

    String INPUT = "input";
    String OUTPUT = "output";

    @Input(INPUT)
    KStream<String, String> messagesIn();

    @Output(OUTPUT)
    KStream<String, String> messagesOut();
} 

Обновление

Как показано в следующем примереподход, предложенный в этот ответ , работал для меня, когда я использую общий синтаксис Spring Cloud Stream для написания обработчиков событий, но не работал, когда я использую Kafka DSL (KStreams).Чтобы увидеть разницу в поведении, просто переключитесь на ExampleAppWorking или ExampleAppNotWorking в аннотации @SpringBootTest:

@RunWith(SpringRunner.class)
@SpringBootTest(classes=ExampleKafkaEmbeddedTest.ExampleAppNotWorking.class)
@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD)
public class ExampleKafkaEmbeddedTest {
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, "so0544in","so0544out");

    @Autowired
    private KafkaTemplate<Integer, byte[]> template;

    @Autowired
    private KafkaProperties properties;

    private static Consumer consumer;

    @BeforeClass
    public static void setup() throws Exception{
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
        System.setProperty("server.port","0");
        System.setProperty("spring.jmx.enabled" , "false");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);

        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        consumer = cf.createConsumer();
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "so0544out");

    }

    @After
    public void tearDown() {
        if (consumer != null){
            consumer.close();
        }
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());

        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "so0544out");

        System.out.println("Contenu chaine resultat : " + cr.value());

        assertEquals(cr.value(), "FOO");
    }

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public static class ExampleAppWorking {

        public static void main(String[] args) {
            SpringApplication.run(ExampleAppWorking.class, args);
        }

        @StreamListener(Processor.INPUT)
        @SendTo(Processor.OUTPUT)
        public String receive(String in) {
            return in.toUpperCase();
        }
    }

    @SpringBootApplication
    @EnableBinding(MyBinding.class)
    public static class ExampleAppNotWorking {

        public static void main(String[] args) {
            SpringApplication.run(ExampleAppNotWorking.class, args);
        }

        @StreamListener
        @SendTo(MyBinding.OUTPUT)
        public KStream<Integer,byte[]> toUpperCase (@Input(MyBinding.INPUT) KStream<Integer,byte[]> in){
            return in.map((key, val) -> KeyValue.pair(key, new String(val).toUpperCase().getBytes()));
        }
    }

    public interface MyBinding {
        String INPUT = "input";
        String OUTPUT = "output";

        @Input(INPUT)
        KStream<Integer, String> messagesIn();

        @Input(OUTPUT)
        KStream<Integer, String> messagesOut();
    }

}

1 Ответ

0 голосов
/ 31 мая 2018

EmbeddedKafa должно работать с Kafka Streams.Все эти тесты используют EmbeddedKafa для тестирования.Вы можете использовать шаблоны, используемые в этих тестах, в качестве шаблона для собственного тестирования.

Посмотрел код, который вы указали ниже в комментариях.Вам необходимо добавить это свойство в свой метод setup.

System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());

Основное приложение Spring Boot ожидает, что брокер Kafka доступен на localhost, и не знает, чтоТест выполняется встроенным брокером.Нам нужно сделать этот факт явным, установив это свойство из теста, чтобы основное загрузочное приложение правильно обнаруживало встроенный брокер kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...