Spring Cloud Stream Kafka Streams Binder 3.x: Нет вывода на второй вывод topi c в случае нескольких привязок вывода - PullRequest
0 голосов
/ 10 июля 2020

У меня есть следующая сигнатура метода bean-компонента процессора:

@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
    return (inputStream1, intputStream2) -> {

        intputStream2
            .peek((k, v) -> {
                log.debug(...);
            });

        return inputStream1
            .mapValues(...)
            .branch((k,v) -> true, (k,v) -> true);

    };
}

Соответствующие свойства:

spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
  myStream-in-0:
    destination: inp0
  myStream-in-1:
    destination: inp1
  myStream-out-0:
    destination: out0
  myStream-out-1:
    destination: out1

Spring Cloud Kafka Stream версия Hoxton.SR4 (spring-cloud-stream-binder -kafka-streams: jar: 3.0.4.RELEASE), встроенный Kafka версии 2.5.0.

Я тестирую свою топологию с помощью встроенного Kafka:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
            "inp0", "inp1", "out0", "out1"
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

    @Test
    public void embeddedKafkaTest() throws IOException, InterruptedException {
        Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
        Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");

        this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
        this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");

        latch = new CountDownLatch(1);
        // ... publish ...
        latch.await(15, TimeUnit.SECONDS);

        ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
        assertThat(out0.count(), is(greaterThanOrEqualTo(1)));

        ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
        assertThat(out1.count(), is(greaterThanOrEqualTo(1)));

    }

private <K,V> Consumer<K, V> createConsumer(String groupName) {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}

Мои тесты показывают, что сообщения от myStream достигают и попадают в topi c "out0", как и ожидалось, но "out1" topi c остается пустым, и модульный тест не выполняется при втором утверждении.

Я пробовал пару вещей, но похоже, что вывод на второй вывод topi c просто не производится (вывод на первый вывод topi c производится хорошо).

Вы видите какие-нибудь ошибки в моей настройке?

И еще одно: оператор return в определении метода bean-компонента myStream показывает предупреждение компилятора:

Непроверенное создание массива универсальных шаблонов для параметра varargs

Но похоже, что ' s как API-интерфейс Spring Cloud Kafka Stream 3.x требует определения возвращаемого типа?

1 Ответ

1 голос
/ 10 июля 2020

Вы передаете два предиката методу branch, и оба они всегда оцениваются как true. Первый предикат всегда побеждает и производит данные для первой выходной привязки. Вызов метода ветвления завершается после того, как первый предикат оценивается как истина. См. javado c для получения более подробной информации. Вам следует использовать разные предикаты (возможно, проверяя определенные условия для ключа / значения). Если первый предикат завершился ошибкой, а второй - успешным, вы увидите данные, полученные для второго вывода topi c.

Что касается этого предупреждения компилятора, я думаю, вы можете спокойно игнорировать это, поскольку API будет убедитесь, что объекты предиката, переданные в вызов branch, будут иметь правильный тип. Поскольку реализация метода использует generi c varargs, вы получите это исключение. См. этот поток для получения подробной информации об этом предупреждении компилятора.

...