Как определить сессию законченную в потоках кафки - PullRequest
0 голосов
/ 15 октября 2019

Я застрял в потоках kafka и не могу справиться со сценарием с DSL. Может кто-нибудь, пожалуйста, помогите.

Сценарий: у меня есть тема timeOff, в которой есть ключ timeOffId и значение типа object. Объект также содержит идентификатор сотрудника, который представляет время ожидания этого сотрудника. Таким образом, у одного сотрудника может быть несколько временных перерывов.

TimeOffs 

timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}

Я хочу получить результат, как показано ниже, чтобы у сотрудника могли быть только его ожидающие перерывы:

employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs

Как мне это сделать? ,Я начал делать что-то вроде приведенного ниже кода, но я не знаю, правильно ли я это делаю или нет.

Мне не нужен код, но просто предлагаю мне правильный / хороший подход сделать это через kafkaПотоки DSL. Спасибо. В приведенном ниже примере я передаю тему и группирую тайм-ауты по employeeId. Но в таком случае как мне получить обновленный статус тайм-аута. Я смущен. Может ли кто-нибудь помочь.

KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
        .aggregate(ArrayList::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                }, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));

1 Ответ

0 голосов
/ 16 октября 2019

Я думаю, что лучший подход - использовать Processor API.

Вы должны реализовать свой собственный org.apache.kafka.streams.processor.Processor. Processor будет иметь хранилище состояний, чтобы держать TimeOffs в состоянии ожидания, и когда придет Timeoff со статусом APPROVED, запись из хранилища состояний будет удалена.

Это будет примерно так:

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class CustomProcessor implements Processor<String, Timeoff> {

    protected KeyValueStore<String, List<Timeoff>> stateStore;
    private String storeName;

    public CustomProcessor(String storeName) {
        this.storeName = storeName;
    }

    @Override
    public void init(ProcessorContext context) {
        stateStore = (KeyValueStore<String, List<Timeoff>>) context.getStateStore(storeName);
    }

    @Override
    public void process(String employeeId, Timeoff timeoff) {
        List<Timeoff> newTimeoffs = Optional.ofNullable(stateStore.get(employeeId)).map(timeoffs -> {
            if ("APPROVED".equals(timeoff.getStatus()))
                timeoffs.remove(employeeId);
            else
                timeoffs.add(timeoff);
            return timeoffs;
        }).orElse(Collections.singletonList(timeoff));
        stateStore.put(employeeId, newTimeoffs);
    }

    ...
}
...