Я изучаю потоки кафки и написал простое приложение, фрагмент которого приведен ниже:
MainApp:
Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic", "Processor3");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
Фрагмент отдельного обработчика потока:
public class Processor1 implements Processor<String, String> {
// Rest of code
@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
Я понял, что нам нужно создать Topology
, а затем, чтобы его инициировать, мы вызываем streams.start();
Я не могу понять, как метод process()
вызывается автоматически и кто его вызывает?