KafkaStreams добавляет более 1 процессора в топологии не работает - PullRequest
1 голос
/ 26 мая 2019

Я добавил несколько процессоров в свою топологию. Работает только 1 процессор. Я могу получать сообщения из topic1, topic2 и обрабатывать их с помощью Processor1 и выводить prod в topic3-sink. Processor2 не вызывается в моем коде.

@Component
public class CustomTopology {
    @PostConstruct
    public void createTopology() {
        Topology topology = new Topology();
        topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1", "topic2");
        topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");
        topology.addProcessor("processor2", new CustomProcessorSupplier2(), "processor1");
        topology.addSink("sink", "topic3-sink", "source", "processor2");
        Properties p = new Properties();
        p.put(APPLICATION_ID_CONFIG, "stream1");
        p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
        p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
        p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
        KafkaStreams streams = new KafkaStreams(topology, p);
        streams.start();
    }
}

class CustomProcessorSupplier1 implements ProcessorSupplier<Integer, Message> {

    @Override
    public Processor<Integer, Message> get() {
        Processor<Integer, Message> processor = new Processor<Integer, Message>() {
            ...

            @Override
            public void process(Integer key, Message value) {
                value.setName(value.getName() + "ProcessB");
            }

            ...
        };
        return processor;
    }
}

class CustomProcessorSupplier2 implements ProcessorSupplier<Integer, Message> {

    @Override
    public Processor<Integer, Message> get() {
        Processor<Integer, Message> processor = new Processor<Integer, Message>() {
           ...

            @Override
            public void process(Integer key, Message value) {
                value.setName(value.getName() + "ansalProcess");
            }

            ...
        };
        return processor;
    }
}

1 Ответ

2 голосов
/ 26 мая 2019

Чтобы передать запись вперед в Процессоре, вы должны позвонить ProcessorContext::forward. Этот метод перегружен. Вы можете переслать все сообщения всем следующим узлам, но вы также можете выбрать подмножество узлов, на которые будет пересылаться сообщение.

Вы должны добавить к своему Processor s импепментацию (CustomProcessorSupplier1 и CustomProcessorSupplier1) context().forward(someKey, someValue);

Ваши процессоры не выдают сообщение на topic3-sink. Сообщения появляются там, потому что один из родительских узлов - SourceNode (входные темы):

topology.addSink("sink", "topic3-sink", "source", "processor2");
...