Не удается сохранить таблицу агрегирования K SQL до Postgres - PullRequest
1 голос
/ 25 марта 2020

Я пытаюсь отразить таблицу K SQL в моей Postgres БД, используя разъем приемника JDB C, но, к сожалению, я не могу заставить его работать.

Я использую Kafka 5.4.1 и у меня есть 2 темы debezium 1.0, сериализованные с Avro, из моей Postgres БД. Это конфигурация моего коннектора Debezium:

    {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "xxx",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "kafka-svc:9092",
        "database.history.kafka.topic": "dbhistory.xxx",
        "database.server.name": "xxx",
        "database.port": "5432",
        "plugin.name": "decoderbufs",
        "table.whitelist": "public.a,public.b",
        "database.hostname": "app-db",
        "name": "connector",
        "connection.url": "jdbc:postgresql://app-db:5432/xxx",
        "database.whitelist": "xxx",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "table"
    }

Затем я использую K SQL CLI для взаимодействия с моим сервером и выдаю следующие команды:

CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');

CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');

CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;

CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;

TLDR, я создаю 2 потока из тем debezium и разделите их, чтобы подготовить их к присоединению. Затем я превращаю один из них (b_by_id) в таблицу, потому что я не хочу использовать оконное соединение в этом случае:

CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');

На данный момент все работает отлично, и я могу играть со своими потоками и таблицы и объединения и видим, что изменения в моей исходной БД немедленно отражаются в моих потоковых запросах в K SQL. Моя проблема возникает, когда я решаю выполнить некоторую статистическую функцию над моими данными и отразить результаты в моей Postgres БД (такой же, как исходная БД). Чтобы сделать это, я создаю новую таблицу K SQL в результате SELECT:

CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;

Затем я установил соединитель приемника JDB C, чтобы выгрузить список изменений grouped_data topi c моей новой таблицы в моей БД со следующей конфигурацией:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://app-db:5432/xxx",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "topics": "grouped_data",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-svc:8081",
    "pk.mode": "record_value",
    "pk.fields": "x, y, z",
    "table.name.format" : "kafka_${topic}",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "max_date",
    "transforms.TimestampConverter.target.type": "Timestamp"
}

К сожалению, ничего не происходит, нет ошибок и нет данных на моей приемной БД. Соединитель правильно создан и настроен, и даже если я заставлю новые сообщения обрабатывать мои потоковые запросы, никакие данные не будут переданы в мою приемную БД, таблица назначения даже не будет создана. Я несколько раз пытался создать соединитель с разными именами и конфигурациями, разными значениями для pk.mode и т. Д. c, но не смог заставить его работать. Создание соединителя для моей таблицы "b", описанной выше, прекрасно работает, и все данные немедленно передаются.

Вот дополнительные сведения о таблице K SQL, которую я пытаюсь отразить в postgres:

describe extended grouped_data;

Name                 : GROUPED_DATA
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

 Field              | Type                      
------------------------------------------------
 ROWTIME            | BIGINT           (system) 
 ROWKEY             | VARCHAR(STRING)  (system) 
 X                  | BIGINT                    
 Y                  | BIGINT                    
 Z                  | BIGINT                    
 MAX_DATE           | BIGINT                    
------------------------------------------------

Спасибо!

1 Ответ

0 голосов
/ 26 марта 2020

Вы настроили Kafka Connect для использования topi c name в нижнем регистре * name

"topics": "grouped_data",

, но для вашего DESCRIBE вывода topi c, в который пишет таблица, находится в верхнем -case:

Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

Если вы внимательно посмотрите свой журнал Kafka Connect, вы обнаружите следующее:

Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE} 

Kafka Connect не прервется, если вы дадите ему topi c этого не существует - потому что, возможно, это топи c, который вы имели в виду , чтобы указать, потому что вы собираетесь впоследствии заполнить его.

Итак, , вы можете либо изменить рабочую конфигурацию Kafka Connect, чтобы использовать имя topi c верхнего регистра, либо переопределить таблицу ksqlDB и включить …WITH (KAFKA_TOPIC='grouped_data') в DDL. ,

...