KStreams - Как справиться с задержкой сообщений на одну тему - PullRequest
1 голос
/ 06 января 2020

У меня есть приложение KStreams, основанное на загрузке Spring, где я объединяю данные по нескольким темам. Каковы / являются наилучшими практиками для обработки ситуации, когда имеется задержка в одном топи c? Я прочитал ссылки, такие как Как управлять Kafka KStream для оконного соединения Kstream? и др.

Вот мой пример кода (Spring Boot App) для создания фиктивных данных по 2 темам - Сотрудник и финансы. Код для topi c ниже:

private void sendEmpData() {
    IntStream.range(0, 1).forEach(index -> {
        EmployeeKey key = new EmployeeKey();
        key.setEmployeeId(1);

        Employee employee = new Employee();
        employee.setDepartmentId(1000);
        employee.setEmployeeFirstName("John);
        employee.setEmployeeId(1);
        employee.setEmployeeLastName("Doe");

        kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
    });
}

Аналогично для topi c:

private void sendFinanceData() {
    IntStream.range(0, 1).forEach(index -> {
        FinanceKey key = new FinanceKey();
        key.setEmployeeId(1);
        key.setDepartmentId(1000);

        Finance finance = new Finance();
        finance.setDepartmentId(1000);
        finance.setEmployeeId(1);
        finance.setSalary(2000);

        kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
    });
}

Тип отметки времени, связанный с этими записями: TimeStampType.CREATE_TIME , который, как я предполагаю, совпадает с временем события в потоках.

У меня есть простое приложение KStreams, которое пересылает финансовый топи c, чтобы ключ финансового потока соответствовал ключу потока сотрудника, а затем выполните объединение, как показано ниже:

employeeKStream.join(reKeyedStream,
            (employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
                    employee.getEmployeeFirstName(),
                    employee.getEmployeeLastName(),
                    employee.getDepartmentId(),
                    finance.getSalary(),
                    finance.getSalaryGrade()),
            JoinWindows.of(windowRetentionTimeMs), //30 seconds
            Joined.with(
                    employeeKeySerde,
                    employeeSerde,
                    financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));

Если запись с соответствующим ключом поступает в финансовую топику спустя более 30 секунд спустя c, тогда объединение не происходит. Любое понимание того, как решить эту проблему, будет полезно. Заранее спасибо.

PS: Эти данные являются художественным произведением. Если он совпадает с идентификатором / зарплатой вашего отдела, это просто совпадение :)

1 Ответ

0 голосов
/ 30 января 2020

Если запись с соответствующим ключом поступает более чем на 30 секунд позже в финансовом топи c, тогда соединение не происходит.

По умолчанию Kafka Streams использует льготное время 24-часовой период, следовательно, даже если данные запаздывают или выходят из строя, ваше объединение должно работать. Обратите внимание, что lag в Kafka всегда ссылается на путь read !

прибывает более чем на 30 секунд позже в финансах topi c

Однако, я думаю, вы на самом деле не имеете в виду, что у вас есть лаг (в том смысле, что вы откладываете чтение), но ваша восходящая запись задерживается - для в этом случае время события может быть просто назначено неправильно:

Обратите внимание, что при записи в топику Kafka c и если вы не указали метку времени явно, производитель назначит отметка времени при вызове send(), а не при создании экземпляра ProducerRecord. Если вы хотите назначить временную метку при создании ProducerRecord, вам нужно передать временную метку, которую вы хотите назначить, в конструктор вручную (не уверен, позволяет ли это при загрузке Spring).

Как В качестве альтернативы (если вы не можете установить метку времени записи явно), вы можете встроить метку времени в значение (это, конечно, требует изменения классов Employee и Finance. При обработке этих данных с помощью Kafka Streams вы можете используйте в пользовательском TimestampExtractor, чтобы получить свою временную метку вместо рекордной временной метки.

...