Как переслать записи сразу на несколько дочерних процессоров Kafka Stream? - PullRequest
0 голосов
/ 01 мая 2019

Можно ли в Kafka Stream API пересылать более одной записи одновременно разным дочерним процессорам? Например, допустим, у нас есть родительский процессор с именем Processor-Parent и два дочерних процессора: Child-1, Child-2.

Когда Processor-Parent получает запись для обработки, я хотел бы сделать следующее.

new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))

Это хорошая практика для пересылки таких записей?

Ответы [ 2 ]

1 голос
/ 02 мая 2019

Это зависит от ваших требований:

  • Если ваша логика проста, вы даже можете использовать Kafka Streams DSL.

  • Если это немного более сложно и вам нужен Procesor API, но вы хотите передать одинаковые записи двум процессорам, вы можете сделать это, как упомянуто @Sameer Killamsetty.

builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
  • Если это более сложно и зависит от некоторой логики в процессоре, вы хотите передать сообщение другому узлу процессора, вы можете сделать это.
builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
public class InputProcessor extends AbstractProcessor<String, String> {
    @Override
    public void process(String key, String value) {
        try {
            context().forward(key, Integer.parseInt(value), To.child("child1"));
            context().forward(key, value, To.child("child2"));
        }
        catch (NumberFormatException nfe) {
            context().forward(key, value, To.child("child2"));
        }
    }
}
0 голосов
/ 02 мая 2019

Это не лучшая практика.Вместо этого создайте свою топологию с одним родительским процессором и несколькими дочерними процессорами.

builder = new TopologyBuilder();
    builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(),SOURCE)
.addProcessor("child2", () -> new child2(),SOURCE);

таким образом потоки kafka гарантируют, что каждое сообщение, поступающее в источник, отправляется обоим дочерним процессорам.

...