У меня есть приложение KStreams, основанное на загрузке Spring, где я объединяю данные по нескольким темам. Каковы / являются наилучшими практиками для обработки ситуации, когда имеется задержка в одном топи c? Я прочитал ссылки, такие как Как управлять Kafka KStream для оконного соединения Kstream? и др.
Вот мой пример кода (Spring Boot App) для создания фиктивных данных по 2 темам - Сотрудник и финансы. Код для topi c ниже:
private void sendEmpData() {
IntStream.range(0, 1).forEach(index -> {
EmployeeKey key = new EmployeeKey();
key.setEmployeeId(1);
Employee employee = new Employee();
employee.setDepartmentId(1000);
employee.setEmployeeFirstName("John);
employee.setEmployeeId(1);
employee.setEmployeeLastName("Doe");
kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
});
}
Аналогично для topi c:
private void sendFinanceData() {
IntStream.range(0, 1).forEach(index -> {
FinanceKey key = new FinanceKey();
key.setEmployeeId(1);
key.setDepartmentId(1000);
Finance finance = new Finance();
finance.setDepartmentId(1000);
finance.setEmployeeId(1);
finance.setSalary(2000);
kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
});
}
Тип отметки времени, связанный с этими записями: TimeStampType.CREATE_TIME , который, как я предполагаю, совпадает с временем события в потоках.
У меня есть простое приложение KStreams, которое пересылает финансовый топи c, чтобы ключ финансового потока соответствовал ключу потока сотрудника, а затем выполните объединение, как показано ниже:
employeeKStream.join(reKeyedStream,
(employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
employee.getEmployeeFirstName(),
employee.getEmployeeLastName(),
employee.getDepartmentId(),
finance.getSalary(),
finance.getSalaryGrade()),
JoinWindows.of(windowRetentionTimeMs), //30 seconds
Joined.with(
employeeKeySerde,
employeeSerde,
financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));
Если запись с соответствующим ключом поступает в финансовую топику спустя более 30 секунд спустя c, тогда объединение не происходит. Любое понимание того, как решить эту проблему, будет полезно. Заранее спасибо.
PS: Эти данные являются художественным произведением. Если он совпадает с идентификатором / зарплатой вашего отдела, это просто совпадение :)