У меня есть модульный тест Kafka Streams, основанный на действительно отличном, надежном и удобном TopologyTestDriver:
try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(),
streamsConfig(Serdes.String().getClass(), SpecificAvroSerde.class))) {
TestInputTopic<String, Event> inputTopic = testDriver.createInputTopic(inputTopicName,
Serdes.String().serializer(), eventSerde.serializer());
TestOutputTopic<String, Frame> outputWindowTopic = testDriver.createOutputTopic(
outputTopicName, Serdes.String().deserializer(), frameSerde.deserializer());
...
}
Я хотел бы протестировать немного более сложную настройку, где "output" topi c «вход» topi c для другой топологии.
Я могу определить несколько тем ввода и вывода внутри одной и той же топологии. Но как только я использую ту же topi c в качестве входных и выходных topi c в той же топологии, я получаю следующее исключение:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic events has already been registered by another source.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.writeToTopology(StreamSourceNode.java:94)
at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:547)
Похоже, что TopologyTestDriver не не дает возможность определить темы ввода-вывода, верно?
Обновление Чтобы лучше проиллюстрировать, чего я пытаюсь достичь:
builder.stream("input-topic, ...)..to("intermediate-topic",...);
builder.stream("intermediate-topic", ...)..to("output-topic",...);
и я хочу иметь возможность проверить (утвердить) содержимое "intermeidate-topi c" в моем модульном тесте. Btw. Я не могу «повторно использовать» результат вызова «.to ()» при построении следующей части топологии, поскольку этот метод возвращает void.
Но у меня есть только testDriver.createInputTopic()
и testDriver.createOutputTopic()
, и я не могу определяя что-то вроде testDriver.createInputOutputTopic()
.