Этот вопрос следует , этот .
Основная задача - сделать соединения на стороне K SQL. Пример ниже проиллюстрирует это. Сообщения об инцидентах поступают в Кафку топи c. Структура этих сообщений:
[
{
"name": "from_ts",
"type": "bigint"
},
{
"name": "to_ts",
"type": "bigint"
},
{
"name": "rulenode_id",
"type": "int"
}
]
И есть Postgres таблица rulenode
:
id | name | description
Данные из обоих источников должны быть объединены полями rulenode_id = rulenode.id
, поэтому как получить одну запись с полями from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description
.
Я хочу сделать это с помощью K SQL, но не с бэкэндом, как сейчас.
Сейчас данные из таблицы Postgres передаются в Kafka с помощью JdbcSourceConnector. Но есть одна маленькая проблема - как вы могли догадаться, данные в таблице Postgres могут быть изменены. И, конечно же, эти изменения должны быть в таблице потоков ИЛИ K SQL.
Ниже меня спросили, почему KTable, а не Kstream. Ну, пожалуйста, посетите эту страницу и посмотрите на первый GIF. Там записи таблицы обновляются при поступлении новых данных. Я думал, что такое поведение - то, что мне нужно (где вместо имен Алиса, Боб у меня есть первичный ключ id
из Postgres table rulenode
). Вот почему я выбрал KTable.
Массовый режим JdbcSourceConnect копирует всю таблицу. И, как вы знаете, все строки поступают в таблицу Kafka для предыдущих Postgres снимков таблицы.
Как и предполагалось, я создал соединитель с конфигами:
{
"name": "from-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": "true",
"connection.url": "connection.url",
"connection.user": "postgres",
"connection.password": "*************",
"table.whitelist": "rulenode",
"mode": "bulk",
"poll.interval.ms": "5000",
"topic.prefix": "pg."
}
Затем создал поток:
create stream rulenodes
with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');
и теперь пытаюсь создать таблицу:
create table rulenodes_unique
as select * from rulenodes;
, но это не сработало с ошибкой:
Неверный тип результата. Ваш запрос SELECT создает поток. Пожалуйста, используйте вместо этого оператор CREATE STREAM AS SELECT.
Я читал, что таблицы используются для хранения агрегированной информации. Например, для хранения в совокупности с функцией COUNT:
create table rulenodes_unique
as select id, count(*) from rulenodes order by id;
Скажите, пожалуйста, как обработать эту ошибку?