Spring Cloud Stream (Hoxton) Производитель / Потребитель Kafka не работает в интеграционных тестах с EmbeddedKafka - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть работающее приложение, которое использует последнее обновление для производителей, поставляемое с Hoxton. Сейчас я пытаюсь добавить несколько интеграционных тестов, утверждая, что продюсер действительно выдает сообщение, как и ожидалось. Проблема в том, что потребитель, которого я использую в тесте, никогда не читает ничего из topi c.

. Чтобы сделать эту проблему воспроизводимой, я повторно использовал проект (spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka) из весеннего облачного потока. примеры, адаптируя его следующим образом:

DynamicDestinationSourceApplication (EmitterProcessor теперь является компонентом)


@SpringBootApplication
@RestController
public class DynamicDestinationSourceApplication {

    @Autowired
    private ObjectMapper jsonMapper;

    @Autowired
    private EmitterProcessor<Message<?>> processor;

    public static void main(String[] args) {
        SpringApplication.run(DynamicDestinationSourceApplication.class, args);
    }

    @SuppressWarnings("unchecked")
    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destinationName = payload.get("id");
        Message<?> message = MessageBuilder.withPayload(payload)
                .setHeader("spring.cloud.stream.sendto.destination", destinationName).build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<?>>> supplier() {
        return () -> processor;
    }

    @Bean
    public EmitterProcessor<Message<?>> processor(){
        return EmitterProcessor.create();
    }

    //Following sink is used as test consumer. It logs the data received through the consumer.
    static class TestSink {

        private final Log logger = LogFactory.getLog(getClass());

        @Bean
        public Consumer<String> receive1() {
            return data -> logger.info("Data received from customer-1..." + data);
        }

        @Bean
        public Consumer<String> receive2() {
            return data -> logger.info("Data received from customer-2..." + data);
        }
    }
}

ModuleApplicationTests

@EmbeddedKafka
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DynamicDestinationSourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

    private static String TOPIC = "someTopic";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private EmitterProcessor<Message<?>> processor;

    @Test
    public void shouldProduceAndConsume() {

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(Collections.singleton(TOPIC));
        consumer.poll(0);

        Message<?> message = MessageBuilder.withPayload(new HashMap<String,String>(){{put("somekey", "somevalue");}})
                .setHeader("spring.cloud.stream.sendto.destination", TOPIC).build();
        processor.onNext(message);

        ConsumerRecord<String, String> someRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
        System.out.println(someRecord);

    }

}

Он заканчивается No records found for topic. Почему это не работает во время тестов?

ОБНОВЛЕНИЕ:

Мой реальный проект не ведет себя точно так же, как проект выше, что я вижу, что emitterProcessor.onNext() не заканчивает тем, что вызывает AbstractMessageHandler.onNext()

Отладка в emitterProcessor.onNext() Я видел, что он вызывает drain(), а у FluxPublish.PubSubInner<T>[] a = subscribers; подписчики - пустой массив, тогда как в обычном исполнении приложения он содержит EmitterProcessor .

1 Ответ

1 голос
/ 03 февраля 2020

Я неправильно добавил testImplementation("org.springframework.cloud:spring-cloud-stream-test-support") в качестве зависимости. При этом используется Test Binder, который не предназначен для использования с интеграционными тестами.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...