Это зависит от ваших требований:
Если ваша логика проста, вы даже можете использовать Kafka Streams DSL.
Если это немного более сложно и вам нужен Procesor API, но вы хотите передать одинаковые записи двум процессорам, вы можете сделать это, как упомянуто @Sameer Killamsetty.
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
- Если это более сложно и зависит от некоторой логики в процессоре, вы хотите передать сообщение другому узлу процессора, вы можете сделать это.
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("child1"));
context().forward(key, value, To.child("child2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("child2"));
}
}
}