От Postgres до Кафки с отслеживанием изменений - PullRequest
0 голосов

Этот вопрос следует , этот .

Основная задача - сделать соединения на стороне K SQL. Пример ниже проиллюстрирует это. Сообщения об инцидентах поступают в Кафку топи c. Структура этих сообщений:

[
    {
        "name": "from_ts", 
        "type": "bigint"
    },
    {
        "name": "to_ts", 
        "type": "bigint"
    },
    {
        "name": "rulenode_id",
        "type": "int"
    }
]

И есть Postgres таблица rulenode:

id | name | description 

Данные из обоих источников должны быть объединены полями rulenode_id = rulenode.id, поэтому как получить одну запись с полями from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description.

Я хочу сделать это с помощью K SQL, но не с бэкэндом, как сейчас.

Сейчас данные из таблицы Postgres передаются в Kafka с помощью JdbcSourceConnector. Но есть одна маленькая проблема - как вы могли догадаться, данные в таблице Postgres могут быть изменены. И, конечно же, эти изменения должны быть в таблице потоков ИЛИ K SQL.

Ниже меня спросили, почему KTable, а не Kstream. Ну, пожалуйста, посетите эту страницу и посмотрите на первый GIF. Там записи таблицы обновляются при поступлении новых данных. Я думал, что такое поведение - то, что мне нужно (где вместо имен Алиса, Боб у меня есть первичный ключ id из Postgres table rulenode). Вот почему я выбрал KTable.

Массовый режим JdbcSourceConnect копирует всю таблицу. И, как вы знаете, все строки поступают в таблицу Kafka для предыдущих Postgres снимков таблицы.


Как и предполагалось, я создал соединитель с конфигами:

{
  "name": "from-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "errors.log.enable": "true",
  "connection.url": "connection.url",
  "connection.user": "postgres",
  "connection.password": "*************",
  "table.whitelist": "rulenode",
  "mode": "bulk",
  "poll.interval.ms": "5000",
  "topic.prefix": "pg."
}

Затем создал поток:

create stream rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

и теперь пытаюсь создать таблицу:

create table rulenodes_unique 
    as select * from rulenodes;

, но это не сработало с ошибкой:

Неверный тип результата. Ваш запрос SELECT создает поток. Пожалуйста, используйте вместо этого оператор CREATE STREAM AS SELECT.

Я читал, что таблицы используются для хранения агрегированной информации. Например, для хранения в совокупности с функцией COUNT:

create table rulenodes_unique 
    as select id, count(*) from rulenodes order by id;

Скажите, пожалуйста, как обработать эту ошибку?

Ответы [ 2 ]

1 голос
/ 03 марта 2020

Вы можете создать STREAM или TABLE поверх Kafka topi c с помощью ksqlDB - это связано с тем, как вы хотите смоделировать данные. Из вашего вопроса ясно, что вам нужно смоделировать его как таблицу (потому что вы хотите присоединиться к последней версии ключа ). Поэтому вам нужно сделать следующее:

create table rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

Теперь есть еще одна вещь, которую вы должны сделать , это убедиться, что данные в вашем topi c правильно введены. Вы не можете указать key='id', и это происходит автоматически - параметр key является просто «подсказкой». Вы должны убедиться, что сообщения в Kafka topi c имеют поле id в ключе . См. ref do c для получения полной информации.

Вы можете сделать это с помощью Преобразования одного сообщения в Kafka Connect :

"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"

Или вы можете сделать это в ksqlDB и изменить ключ - и потому что мы хотим чтобы обработать каждое событие , мы сначала смоделируем его как поток (!) и объявим таблицу поверх повторно введенной клавиши topi c:

create stream rulenodes_source 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;

CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');

Я хотел бы go маршрут преобразования одного сообщения, потому что в целом он более точный и простой.

1 голос
/ 02 марта 2020

Непонятно, какой оператор выдает ошибку, но вводит в заблуждение, если в определении таблицы

Вы можете создавать таблицы непосредственно из тем. Не нужно go через поток

https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html

Если вы хотите использовать поток, как говорят в документах

Используйте оператор CREATE TABLE <b>AS SELECT</b> для создания таблицы с результатами запроса из существующей таблицы или потока.

Возможно, вы захотите использовать в операторах значения, чувствительные к регистру

CREATE STREAM rulenodes WITH (
    KAFKA_TOPIC ='pg.rules_rulenode', 
    VALUE_FORMAT='AVRO', 
    KEY='id'
);


CREATE TABLE rulenodes_unique AS
    SELECT id, COUNT(*) FROM rulenodes 
    ORDER BY id;
...