Я ищу лучший подход для следующей проблемы:
Kafka topi c (пример данных):
{ id: 0, value: '1', value_changed_on: 'lastWeek', value_type: 'user' }
{ id: 0, value: '2', value_changed_on: 'today', value_type: 'user' }
{ id: 0, value: '3', value_changed_on: 'yesterday', value_type: 'user' }
{ id: 0, value: 'foo', value_changed_on: 'yesterday', value_type: 'object' }
Желаемый результат (приемник: mysql)
{ id: 0, user_value: '2', user_value_changed_on: 'today', object_value: 'foo', object_value_changed_on: "yesterday }
Итак, по сути, я группирую его по «id», обогащаю / объединяю его со столбцом «user» + «object» и пропускаю / игнорирую устаревшие обновления (поскольку входной topi c не может гарантировать правильный заказ по деловым причинам (пакетная обработка и т. д. c.)
Мое текущее решение:
Простой потребитель с некоторыми java logi c для сохранения и объединения даты :
псевдо:
if(record.type == "user") {
if(mysqlQuery...record.user_value_changed_on <= record.changed_on) {
persist();
} else {
//skip
}
} else if(record.type == "object") {
if(mysqlQuery...record.object_value_changed_on <= record.changed_on) {
persist();
} else {
//skip
}
}
Интересно, есть ли лучший способ, используя Streams + Ktable для решения этой проблемы?
Stream->KTable(as a local state to keep track of latest updates)->publish(newOrderedTopic)->KConnect(toMySQLSink)
По сути, вместо 2 mysql запросов для upsert, выполнение одного запроса KTable.get для проверки последнего изменения ts, а затем 1 mysql запроса вставки, если ts больше.
Обоснование использования ktable для проверки вместо второго запроса mysql я считаю, что он будет более производительным (производство будет r миллионы этих записей в час)
Я новичок в Kafka + Streams, поэтому был бы признателен за любой вклад, если это осуществимый подход, и, может быть, кто-то может указать мне на пример? Мне не удалось найти в Интернете ничего с этой конкретной проблемой c.