Понимание количества созданных экземпляров StreamProcessor и совместное использование потоковой задачи одного и того же экземпляра streamprocessor? - PullRequest
2 голосов
/ 09 февраля 2020

Я хочу немного подробнее понять взаимосвязь между StreamThread, StreamTask и количеством экземпляров StreamProcessor, когда у нас есть:

  • источник kafka topi c с несколькими разделами, скажем 6.
  • Я храню только ONE StreamThread (num.stream.threads = 1)

Я храню простая топология процессора:

source_topi c -> Processor1 -> Processor2 -> Processo3 -> sink_topi c

Каждый процессор просто переходит к следующему процессору в цепочке. Фрагмент одного из процессоров. Я использую API-интерфейс низкого уровня Java.

public class Processor1 implements Processor<String, String> {

    private ProcessorContext context;
    public Processor1() {

    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context
    }

    @Override
    public void punctuate(long timestamp) {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }
}

Фрагмент приложения основного драйвера:

Topology topology = new Topology();

topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");

Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

При таком расположении у меня возникают следующие вопросы:

  • Сколько экземпляров процессоров (Processor1, Processor2, Processor3) будет создано?
  • Насколько я понимаю, будет SIX stream tasks. Создается ли новый экземпляр процессора для каждого Stream task или они «разделяют» одно и то же Processor instance?
  • Когда создается Stream Thread, создается ли новый экземпляр processor?
  • Stream Tasks созданы как часть Stream Threads создания?

(Новый вопрос добавлен в исходный список)

  • В этом сценарии single stream thread будет иметь SIX stream tasks. stream thread выполняет эти stream tasks один за другим, что-то вроде "in-al oop". Do stream tasks запустить как отдельный "поток". По сути, не в состоянии понять, как single stream thread запускает несколько stream tasks одновременно / параллельно?

Может кто-нибудь, пожалуйста, помогите понять.

Ниже приведено топология, которая печатается:


KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
    StreamsThread appId: my-first-streams-application
        StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
        StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
        Active tasks:
            Running:                                StreamsTask taskId: 0_0
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-0]
                                StreamsTask taskId: 0_1
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-1]
                                StreamsTask taskId: 0_2
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-2]
                                StreamsTask taskId: 0_3
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-3]
                                StreamsTask taskId: 0_4
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-4]
                                StreamsTask taskId: 0_5
                                            ProcessorTopology:
                            SOURCE:
                                topics:     [source-topic-data]
                                children:   [Processor1]
                            Processor1:
                                children:   [Processor2]
                            Processor2:
                                children:   [Processor3]
                            Processor3:
                                children:   [SINK]
                            SINK:
                                topic:      sink-topic-data
                    Partitions [source-topic-data-5]

            Suspended:
            Restoring:
            New:
        Standby tasks:
            Running:
            Suspended:
            Restoring:
            New:


1 Ответ

2 голосов
/ 10 февраля 2020

Сколько экземпляров процессоров (Процессор1, Процессор2, Процессор3) будет создано?

В вашем примере по шесть каждого. Каждое задание будет создавать полную копию Topology. (см. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355; примечание: Topology является логическим представлением программы и создается как ProcessorTopology во время выполнения)

Согласно моему понимание, будут задачи потока SIX. Создан ли новый экземпляр процессора для каждой задачи Stream или они «совместно используют» один и тот же экземпляр Processor?

Каждая задача имеет свой собственный экземпляр Processor - они не являются общими.

При создании потокового потока создается ли новый экземпляр процессора?

Нет. Когда задача создается, она создает новые Processor экземпляров.

Создаются ли потоковые задачи как часть создания потоковых потоков?

Нет. Задачи создаются во время перебалансировки в соответствии с назначением раздела / задачи. KafkaStreams регистрирует StreamsRebalanceListener на своем внутреннем спутнике, который вызывает TaskManager#createTasks()

Update (по мере продления вопроса):

В этом сценарии один поток поток будет иметь задачи потока SIX. Выполняет ли поток потока эти потоковые задачи один за другим, что-то вроде «in-al oop». У потоковых задач запускается как отдельный «поток». По сути, не в состоянии понять, как один поток выполняет одновременно несколько потоковых задач / параллельно?

Да, StreamsThread будет выполнять задачи в al oop. Других тем нет. Следовательно, задачи, которые назначены одному и тому же потоку, выполняются не в одно и то же время / параллельно, а одна за другой (ср. https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 - каждый StreamThread использовал ровно один TaskManager который использует AssignedStreamsTasks и AssignedStandbyTasks внутри.)

...