KStream с Testbinder - весенний облачный поток Кафка - PullRequest
0 голосов
/ 30 августа 2018

Недавно я начал изучать Spring Cloud Stream для Kafka и изо всех сил пытался заставить TestBinder работать с Kstreams. Это известное ограничение или я что-то упустил?

Это прекрасно работает:

Строковый процессор:

@StreamListener(TopicBinding.INPUT)
@SendTo(TopicBinding.OUTPUT)
public String process(String message) {
    return message + " world";
}

Струнный тест:

  @Test
  @SuppressWarnings("unchecked")
  public void testString() {
    Message<String> message = new GenericMessage<>("Hello");
    topicBinding.input().send(message);
    Message<String> received = (Message<String>) messageCollector.forChannel(topicBinding.output()).poll();
    assertThat(received.getPayload(), equalTo("Hello world"));
  }

Но когда я пытаюсь использовать KStream в своем процессе, я не могу заставить TestBinder работать.

процессор Kstream:

  @SendTo(TopicBinding.OUTPUT)
  public KStream<String, String>  process(
      @Input(TopicBinding.INPUT) KStream<String, String> events) {
    return events.mapValues((value) -> value + " world");
  }

Тест KStream:

@Test
@SuppressWarnings("unchecked")
public void testKstream() {
    Message<String> message = MessageBuilder
      .withPayload("Hello")
      .setHeader(KafkaHeaders.TOPIC, "event.sirism.dev".getBytes())
      .setHeader(KafkaHeaders.MESSAGE_KEY, "Test".getBytes())
      .build();
    topicBinding.input().send(message);
    Message<String> received = (Message<String>) 
    messageCollector.forChannel(topicBinding.output()).poll();
    assertThat(received.getPayload(), equalTo("Hello world"));
 }

Как вы, возможно, заметили, я опустил @StreamListener из процессора Kstream, но без него не похоже, что тестбиндер может найти обработчик. (но при этом он не работает при запуске приложения)

Это известная ошибка / ограничение, или я просто делаю что-то глупое здесь?

1 Ответ

0 голосов
/ 30 августа 2018

Связыватель теста предназначен только для связывателей на основе MessageChannel (подклассы AbstractMessageChannelBinder). KStreamBinder не использует MessageChannel с.

Вы можете проводить тестирование с использованием реального связующего и встроенного брокера kafka, предоставляемого модулем spring-kafka-test.

Также см. этот выпуск .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...