Я добавил несколько процессоров в свою топологию. Работает только 1 процессор. Я могу получать сообщения из topic1, topic2 и обрабатывать их с помощью Processor1 и выводить prod в topic3-sink. Processor2 не вызывается в моем коде.
@Component
public class CustomTopology {
@PostConstruct
public void createTopology() {
Topology topology = new Topology();
topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1", "topic2");
topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");
topology.addProcessor("processor2", new CustomProcessorSupplier2(), "processor1");
topology.addSink("sink", "topic3-sink", "source", "processor2");
Properties p = new Properties();
p.put(APPLICATION_ID_CONFIG, "stream1");
p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
KafkaStreams streams = new KafkaStreams(topology, p);
streams.start();
}
}
class CustomProcessorSupplier1 implements ProcessorSupplier<Integer, Message> {
@Override
public Processor<Integer, Message> get() {
Processor<Integer, Message> processor = new Processor<Integer, Message>() {
...
@Override
public void process(Integer key, Message value) {
value.setName(value.getName() + "ProcessB");
}
...
};
return processor;
}
}
class CustomProcessorSupplier2 implements ProcessorSupplier<Integer, Message> {
@Override
public Processor<Integer, Message> get() {
Processor<Integer, Message> processor = new Processor<Integer, Message>() {
...
@Override
public void process(Integer key, Message value) {
value.setName(value.getName() + "ansalProcess");
}
...
};
return processor;
}
}