В моем преобразовании на основе DSL у меня есть поток -> ветвь, где я хочу, чтобы разветвленный вывод перенаправлялся на несколько тем.
Текущий метод branch.to()
принимает только String
.
Есть ли какая-нибудь простая опция с stream.branch
, где я могу перенаправить результат на несколько тем. Вместе с потребителем я могу подписаться на несколько тем, предоставив массив строк в качестве тем.
Моя проблема требует от меня выполнения нескольких действий, если конкретный предикат удовлетворяет запросу.
Я пробовал с stream.branch[index].to(string)
, но этого недостаточно для моего требования. Я ищу что-то вроде stream.branch[index].to(string array of topics)
или stream.branch[index].to(string)
.
Я ожидаю, что branch.to
метод с несколькими темами или есть какой-нибудь альтернативный способ добиться того же с потоками?
добавление примера кода. Удаляются фактические имена переменных.
Мои предикаты
Predicate <String, MyDomainObject> Predicate1 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
Predicate <String, MyDomainObject> Predicate2 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
KStream <String, MyDomainObject>[] branches= myStream.branch(
Predicate1, Predicate2
);
// here I need your suggestions.
// this is my current implementation
branches[0].to(singleTopic),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));
// I want to send notification to multiple topics. something like below
branches[0].to(topicList),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));