Я пытаюсь сохранить результат запроса ksql (оконную таблицу) в базу данных, используя kafka connect.
Теперь я хочу (используя kafka jdbc connect) сохранить их в базе данных, но обновляемые строки (так как таблица kafka в настоящее время обновляет значения) вместо просто потока сообщений;
Итак, хотя сообщения будут такими:
1558042958867 | User_9 : Window{start=1558042920000 end=-} | User_9 | 20
1558042961348 | User_9 : Window{start=1558042920000 end=-} | User_9 | 21
1558042962141 | User_9 : Window{start=1558042920000 end=-} | User_9 | 22
1558042965552 | User_9 : Window{start=1558042920000 end=-} | User_9 | 23
1558042968275 | User_9 : Window{start=1558042920000 end=-} | User_9 | 24
1558042969668 | User_9 : Window{start=1558042920000 end=-} | User_9 | 25
1558042973915 | User_9 : Window{start=1558042920000 end=-} | User_9 | 26
1558042976235 | User_9 : Window{start=1558042920000 end=-} | User_9 | 27
1558042980197 | User_9 : Window{start=1558042980000 end=-} | User_9 | 1
1558042980635 | User_9 : Window{start=1558042980000 end=-} | User_9 | 2
1558042982969 | User_9 : Window{start=1558042980000 end=-} | User_9 | 3
1558042983511 | User_9 : Window{start=1558042980000 end=-} | User_9 | 4
1558042986352 | User_9 : Window{start=1558042980000 end=-} | User_9 | 5
1558042986863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 6
1558042988328 | User_9 : Window{start=1558042980000 end=-} | User_9 | 7
1558042988863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 8
В базе данных хотелось бы только:
User_9 : Window{start=1558042920000 end=-} | User_9 | 27
User_9 : Window{start=1558042980000 end=-} | User_9 | 8
Как то так.
Есть ли какая-то магия вокруг ksql / kafka-connect, которая позволила бы мне сделать это?
Для пояснения - последнее поле - это агрегат, который подсчитывает, сколько времени x произошло за время окна.
Я бы предположил, что мог бы получить начало окна + ключ в качестве ключа базы данных и обновить их, но я не уверен, как этого добиться в KSQL. Может быть, это было бы возможно с Kafka Streams?
@ Edit:
Хорошо, мне удалось это сделать, добавив эти свойства в конфигурацию приемника:
pk.mode=record_key
pk.fields=rowkey
insert.mode=upsert
Теперь строки обновляются, но данные окна немного лишены смысла, это выглядит так:
TOTAL USERID rowkey
32 User_9 User_9j�
31 User_9 User_9jı�`
22 User_9 User_9jIJ��
1 User_9 User_9jij�
Значит, окно есть, но в двоичном коде? Не уверен, что там происходит.
Мне все еще нужно получить эту дату в каком-то формате, который читается