Я пытаюсь отразить таблицу K SQL в моей Postgres БД, используя разъем приемника JDB C, но, к сожалению, я не могу заставить его работать.
Я использую Kafka 5.4.1 и у меня есть 2 темы debezium 1.0, сериализованные с Avro, из моей Postgres БД. Это конфигурация моего коннектора Debezium:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka-svc:9092",
"database.history.kafka.topic": "dbhistory.xxx",
"database.server.name": "xxx",
"database.port": "5432",
"plugin.name": "decoderbufs",
"table.whitelist": "public.a,public.b",
"database.hostname": "app-db",
"name": "connector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"database.whitelist": "xxx",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.source.fields": "table"
}
Затем я использую K SQL CLI для взаимодействия с моим сервером и выдаю следующие команды:
CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');
CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');
CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;
CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;
TLDR, я создаю 2 потока из тем debezium и разделите их, чтобы подготовить их к присоединению. Затем я превращаю один из них (b_by_id) в таблицу, потому что я не хочу использовать оконное соединение в этом случае:
CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');
На данный момент все работает отлично, и я могу играть со своими потоками и таблицы и объединения и видим, что изменения в моей исходной БД немедленно отражаются в моих потоковых запросах в K SQL. Моя проблема возникает, когда я решаю выполнить некоторую статистическую функцию над моими данными и отразить результаты в моей Postgres БД (такой же, как исходная БД). Чтобы сделать это, я создаю новую таблицу K SQL в результате SELECT:
CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;
Затем я установил соединитель приемника JDB C, чтобы выгрузить список изменений grouped_data topi c моей новой таблицы в моей БД со следующей конфигурацией:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"topics": "grouped_data",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-svc:8081",
"pk.mode": "record_value",
"pk.fields": "x, y, z",
"table.name.format" : "kafka_${topic}",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "max_date",
"transforms.TimestampConverter.target.type": "Timestamp"
}
К сожалению, ничего не происходит, нет ошибок и нет данных на моей приемной БД. Соединитель правильно создан и настроен, и даже если я заставлю новые сообщения обрабатывать мои потоковые запросы, никакие данные не будут переданы в мою приемную БД, таблица назначения даже не будет создана. Я несколько раз пытался создать соединитель с разными именами и конфигурациями, разными значениями для pk.mode и т. Д. c, но не смог заставить его работать. Создание соединителя для моей таблицы "b", описанной выше, прекрасно работает, и все данные немедленно передаются.
Вот дополнительные сведения о таблице K SQL, которую я пытаюсь отразить в postgres:
describe extended grouped_data;
Name : GROUPED_DATA
Type : TABLE
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
Field | Type
------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
X | BIGINT
Y | BIGINT
Z | BIGINT
MAX_DATE | BIGINT
------------------------------------------------
Спасибо!