Приемник Kafka Connect JDBC - хранение агрегированных данных из KSQL - PullRequest
1 голос
/ 17 мая 2019

Я пытаюсь сохранить результат запроса ksql (оконную таблицу) в базу данных, используя kafka connect.

Теперь я хочу (используя kafka jdbc connect) сохранить их в базе данных, но обновляемые строки (так как таблица kafka в настоящее время обновляет значения) вместо просто потока сообщений;

Итак, хотя сообщения будут такими:

1558042958867 | User_9 : Window{start=1558042920000 end=-} | User_9 | 20
1558042961348 | User_9 : Window{start=1558042920000 end=-} | User_9 | 21
1558042962141 | User_9 : Window{start=1558042920000 end=-} | User_9 | 22
1558042965552 | User_9 : Window{start=1558042920000 end=-} | User_9 | 23
1558042968275 | User_9 : Window{start=1558042920000 end=-} | User_9 | 24
1558042969668 | User_9 : Window{start=1558042920000 end=-} | User_9 | 25
1558042973915 | User_9 : Window{start=1558042920000 end=-} | User_9 | 26
1558042976235 | User_9 : Window{start=1558042920000 end=-} | User_9 | 27
1558042980197 | User_9 : Window{start=1558042980000 end=-} | User_9 | 1
1558042980635 | User_9 : Window{start=1558042980000 end=-} | User_9 | 2
1558042982969 | User_9 : Window{start=1558042980000 end=-} | User_9 | 3
1558042983511 | User_9 : Window{start=1558042980000 end=-} | User_9 | 4
1558042986352 | User_9 : Window{start=1558042980000 end=-} | User_9 | 5
1558042986863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 6
1558042988328 | User_9 : Window{start=1558042980000 end=-} | User_9 | 7
1558042988863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 8

В базе данных хотелось бы только:

User_9 : Window{start=1558042920000 end=-} | User_9 | 27
User_9 : Window{start=1558042980000 end=-} | User_9 | 8

Как то так. Есть ли какая-то магия вокруг ksql / kafka-connect, которая позволила бы мне сделать это?

Для пояснения - последнее поле - это агрегат, который подсчитывает, сколько времени x произошло за время окна.

Я бы предположил, что мог бы получить начало окна + ключ в качестве ключа базы данных и обновить их, но я не уверен, как этого добиться в KSQL. Может быть, это было бы возможно с Kafka Streams?

@ Edit:

Хорошо, мне удалось это сделать, добавив эти свойства в конфигурацию приемника:

pk.mode=record_key
pk.fields=rowkey
insert.mode=upsert

Теперь строки обновляются, но данные окна немного лишены смысла, это выглядит так:

TOTAL   USERID  rowkey
32      User_9  User_9j�
31      User_9  User_9jı�`
22      User_9  User_9jIJ��
1       User_9  User_9jij� 

Значит, окно есть, но в двоичном коде? Не уверен, что там происходит. Мне все еще нужно получить эту дату в каком-то формате, который читается

1 Ответ

0 голосов
/ 19 мая 2019

ок, нашел решение. Итак, прежде всего мне нужно было в запросе создать поля window_start / window_end, например:

SELECT [...], WINDOWSTART() AS window_start, WINDOWEND() AS window_end, [...]

После этого мне пришлось добавить эти параметры в сток:

pk.mode=record_value
pk.fields=[...],WINDOW_START
insert.mode=upsert

Это работает.

...