Кафка: создание потока из topi c со значениями в отдельных столбцах - PullRequest
0 голосов
/ 01 апреля 2020

Я только что подключил свою кафку к postgres с помощью postgres разъема источника. Теперь, когда я печатаю topi c, я получаю следующий вывод:

rowtime: 4/1/20 4:16:12 PM UTC, key: <null>, value: {"userid": 4, "id": 5, "title": "lorem", "body": "dolor sit amet, consectetur"}
rowtime: 4/1/20 4:16:12 PM UTC, key: <null>, value: {"userid": 5, "id": 6, "title": "ipsum", "body": "cupidatat non proident"}

Как создать поток из этой топики c, чтобы значения были разделены на свои собственные столбцы, как они были в Изначально таблица базы данных?

Дополнительный вопрос: есть ли способ указать jdb c -connector для разделения столбцов на topi c при создании исходного соединителя?

Мой соединитель выглядит как это:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_02",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:postgresql://postgres:5432/kafka",
                "connection.user": "bob",
                "connection.password": "builder",
                "topic.prefix": "post_",
                "mode":"bulk",
                "table.whitelist" : "kafka_t.users_t",
                "poll.interval.ms" : 500
                }
        }'

1 Ответ

1 голос
/ 02 апреля 2020

Как создать поток из этой топи c, чтобы значения были разделены на свои собственные столбцы, как они изначально были в таблице базы данных?

Не уверен на 100% что вы имеете в виду под этим. Если вы используете Kafka Streams, вы можете, например, создать KStream<KeyType, Columns> с пользовательским типом Columns (или просто использовать JSON в качестве типа значения), чтобы получить «представление столбца» для ваших данных.

Точно так же вы можете использовать ksqlDB с командой CREATE STREAM - она ​​может автоматически разбирать значение JSON на соответствующие столбцы.

Дополнительный вопрос: есть ли способ указать в jdb c -коннектор для разделения столбцов на topi c при создании исходного коннектора?

Что вы подразумеваете под этим? Kafka topi c имеет модель данных ключ-значение, и поэтому, если вы сохраняете какие-либо данные в topi c, они должны либо go в ключе, либо в значении. Если у вас более структурированный тип, такой как кортеж БД, у брокеров Kafka нет встроенной поддержки, но вам нужно вписать его в модель ключ-значение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...