Тема ввода-вывода TopologyTestDriver для Kafka Streams - PullRequest
0 голосов
/ 03 мая 2020

У меня есть модульный тест Kafka Streams, основанный на действительно отличном, надежном и удобном TopologyTestDriver:

    try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(),
            streamsConfig(Serdes.String().getClass(), SpecificAvroSerde.class))) {

        TestInputTopic<String, Event> inputTopic = testDriver.createInputTopic(inputTopicName,
                Serdes.String().serializer(), eventSerde.serializer());

        TestOutputTopic<String, Frame> outputWindowTopic = testDriver.createOutputTopic(
                outputTopicName, Serdes.String().deserializer(), frameSerde.deserializer());

        ...

     }

Я хотел бы протестировать немного более сложную настройку, где "output" topi c «вход» topi c для другой топологии.

Я могу определить несколько тем ввода и вывода внутри одной и той же топологии. Но как только я использую ту же topi c в качестве входных и выходных topi c в той же топологии, я получаю следующее исключение:

org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic events has already been registered by another source.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
    at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.writeToTopology(StreamSourceNode.java:94)
    at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
    at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
    at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:547)

Похоже, что TopologyTestDriver не не дает возможность определить темы ввода-вывода, верно?

Обновление Чтобы лучше проиллюстрировать, чего я пытаюсь достичь:

builder.stream("input-topic, ...)..to("intermediate-topic",...);
builder.stream("intermediate-topic", ...)..to("output-topic",...);

и я хочу иметь возможность проверить (утвердить) содержимое "intermeidate-topi c" в моем модульном тесте. Btw. Я не могу «повторно использовать» результат вызова «.to ()» при построении следующей части топологии, поскольку этот метод возвращает void.

Но у меня есть только testDriver.createInputTopic() и testDriver.createOutputTopic(), и я не могу определяя что-то вроде testDriver.createInputOutputTopic().

1 Ответ

0 голосов
/ 04 мая 2020

Использование того же topi c в качестве ввода и вывода topi c должно работать. Однако вы не можете использовать один и тот же topi c в качестве входного topi c несколько раз (трассировка следов указывает, что вы пытаетесь это сделать).

Если вы хотите использовать один и тот же ввод topi c дважды, вы просто добавляете его один раз и «разветвляете»:

KStream stream = builder.stream(...);
stream.map(...); // first usage
stream.filter(...); // second usage

Использование одного и того же объекта KStream дважды, по сути, является «разветвлением» (или «трансляцией»), который отправит входные данные для обоих операторов.

...