API-интерфейс процессора потоков Kafka context.forward - PullRequest
0 голосов
/ 23 февраля 2019

Для входящей записи мне нужно проверить значение и на основе результирующего объекта мне нужно перенаправить ошибку в разные темы и, если она была успешно подтверждена, переслать то же самое с помощью context.forward ().Это можно сделать с помощью DSL, как указано в этой ссылке

с использованием kafka-streams для условной сортировки входного потока json

Я не нашел четкого способа сделать этов процессоре API.

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}

Теперь вызывающему абоненту снова нужно проверить и по ключу нужно дифференцировать тему приемника.Я использую processorAPI, потому что мне нужны заголовки.

Редактировать:

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;

Когда условие ложно, как передать в другой поток.В настоящее время создается другой предикат, который собирает все другие записи, которые не удовлетворяют вышеуказанному предикату в цепочке.Есть ли способ сделать в том же предикате?

1 Ответ

0 голосов
/ 23 февраля 2019

Когда вы указываете Topology, вы назначаете имена всем узлам и подключаете их:

Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z

Если процессор «X» подключен к нисходящим процессорам «Y» и «Z», выМожно использовать имя узла для отправки записи в «Y» или «Z».Если вы не укажете имя, запись будет отправлена ​​нижестоящим («дочерним») процессорам.

// this is `process()` of "X"
public void process(String key, String value) {
    context.forward(newKey, newValue); // send to both Y and Z
    context.forward(newKey, newValue, To.child("Y")); // send it only to Y
    context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}
...