Вы можете легко смешивать DSL и процессор API.Насколько я понимаю, вы хотели бы построить график обработки, используя оба эти метода, чтобы сделать это для DSL, вы можете вызвать StreamsBuilder::stream
, а для Processor API вы вызываете StreamsBuilder::build()
, чтобы получить Topology
, а затем применить функцию, чтобы добавить процессори т. д.
Исходный код будет выглядеть примерно так:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
EDIT1:
Вы можете создать две топологии с DSL, работающим параллельно и слушающим разные темы.Это можно сделать как @Matthias J. Sax, упомянутый с KStream::transform(...)
, KStream::transformValues(...)
и KStream::process(...)
.Код будет примерно таким:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);