Запись в тему в процессоре с использованием Kafka Stream DSL - PullRequest
0 голосов
/ 17 апреля 2019

Мне нужно использовать API Kafka Sreams вместе с процессором API. Я также хочу написать различные типы объектов в различные темы в рамках моей реализации процессора, т. Е. Испускать различные объекты в процессе & пунктуации . Я видел, что существует KIP-313 flatTransform , который, вероятно, решит мою проблему.

Если я использую:

inputStream.process(processorSupplier,,)

Так как это «завершающая» операция (тип возвращаемого значения void), могу ли я использовать в своем процессоре внутреннего производителя Kafka. Я не видел такой реализации, это разумный подход, есть ли побочные эффекты?

1 Ответ

0 голосов
/ 18 апреля 2019

Если вам нужен такой низкоуровневый подход, вы можете построить всю топологию самостоятельно:

Topology topology = builder.build();
topology.addSource("inputNode","input");
topology.addProcessor("inProcessor", InputProcessor::new, "inputNode");
topology.addSink("sink1",
    (k, v, rc) -> "topic1",
    new StringSerializer(),
    new IntegerSerializer(),
    "inProcessor");
topology.addSink("sink2",
    (k, v, rc) -> "topic2",
    new StringSerializer(),
    new StringSerializer(),
    "inProcessor");

InputProcessor зависит от бизнес-логики, создающей различные типы объектов и передающей их различным узлам приемника (темам).

Пример примера имеет следующую логику:

  • , если значение сообщения может быть проанализировано в целое число, переслать его на два узла приемника (сток1, приемник2), в приемник1 как Integerв сток2 как String.
  • , если не пересылать сообщение только в сток2.
public class InputProcessor extends AbstractProcessor<String, String> {
    @Override
    public void process(String key, String value) {
        try {
            context().forward(key, Integer.parseInt(value), To.child("sink1"));
            context().forward(key, value, To.child("sink2"));
        }
        catch (NumberFormatException nfe) {
            context().forward(key, value, To.child("sink2"));
        }
    }
}
...