Если вам нужен такой низкоуровневый подход, вы можете построить всю топологию самостоятельно:
Topology topology = builder.build();
topology.addSource("inputNode","input");
topology.addProcessor("inProcessor", InputProcessor::new, "inputNode");
topology.addSink("sink1",
(k, v, rc) -> "topic1",
new StringSerializer(),
new IntegerSerializer(),
"inProcessor");
topology.addSink("sink2",
(k, v, rc) -> "topic2",
new StringSerializer(),
new StringSerializer(),
"inProcessor");
InputProcessor зависит от бизнес-логики, создающей различные типы объектов и передающей их различным узлам приемника (темам).
Пример примера имеет следующую логику:
- , если значение сообщения может быть проанализировано в целое число, переслать его на два узла приемника (сток1, приемник2), в приемник1 как
Integer
в сток2 как String
. - , если не пересылать сообщение только в сток2.
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("sink1"));
context().forward(key, value, To.child("sink2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("sink2"));
}
}
}