Kafka Streams: несколько топологий в одном приложении - PullRequest
0 голосов
/ 20 апреля 2019

Я хотел бы использовать как Processor API, так и DSL в одном приложении потоков Kafka. Кроме того, как создать и запустить несколько топологий в одном приложении (например, 1 с использованием API процессора и другое с использованием DSL.)

1 Ответ

3 голосов
/ 21 апреля 2019

Вы можете легко смешивать DSL и процессор API.Насколько я понимаю, вы хотели бы построить график обработки, используя оба эти метода, чтобы сделать это для DSL, вы можете вызвать StreamsBuilder::stream, а для Processor API вы вызываете StreamsBuilder::build(), чтобы получить Topology, а затем применить функцию, чтобы добавить процессори т. д.

Исходный код будет выглядеть примерно так:

StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

EDIT1:

Вы можете создать две топологии с DSL, работающим параллельно и слушающим разные темы.Это можно сделать как @Matthias J. Sax, упомянутый с KStream::transform(...), KStream::transformValues(...) и KStream::process(...).Код будет примерно таким:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);
...