У меня есть KStream с данными из темы to1 , как это:
T1-KEY -> {T1}
T2-KEY -> {T2}
и KTable , сконструированные следующим образом:
Я использую org.apache.kafka.streams.StreamsBuilder , чтобы создать KTable из некоторой темы to2 , которая выглядит следующим образом:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
Затем поток выводится на карту и группируется по Key s.t. результирующий KTable выглядит так:
T1 -> { ["B1", "B2"] }
Позже, теперь в теме появляется следующее сообщение to2 :
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
Теперь я ожидаю, что моя KTable отразит изменения и будет выглядеть следующим образом:
T1 -> { ["B2"] }
но это выглядит так:
T1 -> { ["B1", "B2"] }
Я заметил, что в моем Aggregator<Tx-KEY, Bx, Set<Bx>>
последним аргументом является набор ["B1", "B2"]
, хотя, когда я смотрю перед агрегацией, я получаю только одно совпадение "B2"
.
Я неправильно понимаю совокупность или что здесь происходит?
EDIT
Думаю, я сузил это: очевидно, Initializer
агрегата вызывается только для очень в первый раз - после этого агрегат всегда получает last aggregate
в качестве последнего аргумента, например,
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
, где Set<Bx> aggregate
- это []
для самого первого вызова (созданного с помощью инициализатора), но ["B1", "B2"]
для второго вызова.
Есть идеи?
РЕДАКТИРОВАТЬ 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
РЕДАКТИРОВАТЬ 3
Я не могу использовать только плоскую карту, так как мне нужно объединить несколько элементов Ax, например,
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
где я тогда ожидаю какую-то группу, например
T1 -> { ["B1", "B2"] }
и в следующей итерации, когда сообщение
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
прибывает, я ожидал
T1 -> { ["B1"] }
..