Как метод process () в kafka-stream-processor вызывается автоматически? - PullRequest
0 голосов
/ 04 июня 2019

Я изучаю потоки кафки и написал простое приложение, фрагмент которого приведен ниже:

MainApp:

        Topology topology = new Topology();

        topology.addSource("SOURCE", "source-topic");
        topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
        topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
        topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
        topology.addSink("SINK", "sink-topic", "Processor3");

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

Фрагмент отдельного обработчика потока:

public class Processor1 implements Processor<String, String> {

   // Rest of code

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }

Я понял, что нам нужно создать Topology, а затем, чтобы его инициировать, мы вызываем streams.start();

Я не могу понять, как метод process()вызывается автоматически и кто его вызывает?

1 Ответ

1 голос
/ 04 июня 2019

Processor process() метод, вызываемый классом ProcessorContextImpl автоматически при каждом входящем сообщении для определенного узла топологии. Для вашей встроенной топологии, когда сообщение поступает во входящую тему, узел SOURCE использует его и пересылает (распространяет) сообщение на дочерний узел, внутренне вызывая метод forward (вы можете отладить / посмотреть код из класса ProcessorContextImpl). В вашем случае узел SOURCE пересылает ключ и значение на дочерний узел Processor1. После этого сработал метод process() из класса Processor1. Когда код достигает context.forward(), сообщение пересылается на следующий дочерний узел, Processor2. После этого сообщение распространяется на узлы Processor3 и SINK аналогичным образом, и, наконец, сообщение создается в исходящей теме. Такой конвейер для конкретного сообщения выполняется в одном потоке (и если у вас есть значение по умолчанию для config num.stream.threads = 1, все сообщения будут обрабатываться в одном потоке для экземпляра приложения).

...