Я застрял в потоках kafka и не могу справиться со сценарием с DSL. Может кто-нибудь, пожалуйста, помогите.
Сценарий: у меня есть тема timeOff, в которой есть ключ timeOffId и значение типа object. Объект также содержит идентификатор сотрудника, который представляет время ожидания этого сотрудника. Таким образом, у одного сотрудника может быть несколько временных перерывов.
TimeOffs
timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}
Я хочу получить результат, как показано ниже, чтобы у сотрудника могли быть только его ожидающие перерывы:
employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs
Как мне это сделать? ,Я начал делать что-то вроде приведенного ниже кода, но я не знаю, правильно ли я это делаю или нет.
Мне не нужен код, но просто предлагаю мне правильный / хороший подход сделать это через kafkaПотоки DSL. Спасибо. В приведенном ниже примере я передаю тему и группирую тайм-ауты по employeeId. Но в таком случае как мне получить обновленный статус тайм-аута. Я смущен. Может ли кто-нибудь помочь.
KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));