Совокупность потоков Kafka с удалениями и / или изменениями ключей - PullRequest
0 голосов
/ 23 марта 2019

Я пытаюсь определить поток kafka, который принимает записи из темы, скажем, EMPLOYEE, где записи содержат атрибуты о сотруднике и его отделе, и преобразовывает его в другую тему, DEPARTMENT, которая содержит атрибуты отдела, и список всех сотрудников (с несколькими изменениями без сохранения состояния на сотруднике).

Записи РАБОТНИКА повторяют данные отдела. ( Я на самом деле имею дело с некоторыми данными заголовка DICOM, но я придерживаюсь более универсально понятных отношений. Я пытаюсь понять общее решение ). Кроме того, записи в теме содержат только текущие данные (т.е. без предварительного идентификатора отдела, если отдел изменился.)

Это похоже на работу для совокупности. У меня есть кое-что, что, кажется, работает для простого случая:

        ...
        KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
        stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
                .groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))

                // dummy reduce to a get a ktable for agg:
                .reduce((aggValue, newEmp) -> newEmp) 
                .groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))

                .aggregate(Department::new, this::addEmployee, this::removeEmployee,
                           jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
                .toStream()
                .to("DEPARTMENT", jsonProducedWith(Department.class));
        ...

    private Department addEmployee(String deptId, Employee employee, Department department) {
        department.addEmployee(employee);
        if (department.getId() == null) {
            department.setId(employee.getDepartmentId());
            department.setName(employee.getDepartmentName());
        }
        return department;
    }

Это работает для добавления или обновления. Однако со временем сотрудник может быть удален или переназначен в другой отдел. Я полагаю, что удаление должно быть надгробной записью (k: empId, v: null), отправленной в тему EMPLOYEE. Тем не менее, у меня больше нет отдела ID, я должен был бы выполнить нулевую проверку (и вернуть нулевое значение для DepartmentId), поэтому удаление при удалении сотрудника никогда не происходит. Аналогичная проблема для смены отдела.

Итак, как кафка подходит к этому?

1 Ответ

0 голосов
/ 23 марта 2019

Я думаю, достаточно использовать ваш код, но немного изменить семантику удаления сотрудника.

Вы должны добавить какой-то Mock отдел (будет использоваться при удалении пользователя из отдела).

Если сотрудник удален, вместо того, чтобы установить отдел на null, его следует присвоить Mock отдел.

...