Kafka Connect - JSON Converter - JDBC Sink Connector - Тип столбца JSON - PullRequest
0 голосов
/ 22 января 2019

сценарий использования - это сохранение всего сообщения (которое является JSON) и ключа в виде записи в таблице с двумя столбцами 'id' и 'data'.

База данных - это Postgres, и она поддерживает тип столбца какJSON.

Согласно этой статье, поддерживаемыми типами в JSONConverter являются string, int64 и т. Д. https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter

Возможно ли иметь тип поля данных в виде JSON, которое затем можно сохранить в PostgresБД со столбцом типа JSON.

schema = `{
"type":"struct",
"fields":[
    {"type":"string", "optional": false, "field":"id"},
    {"type":"string", "optional": false, "field":"data"}
]}`

Пример полезной нагрузки данных:

"payload": { "id": 10000, "data": {"hello":"world"} }

Выше будут храниться данные в виде текста и ожидается, что столбец будет иметь тип текста в Postgres.Если столбец в Postgres имеет тип JSON, то коннектор Sink JDBC выдаст ошибку.

Использование типов JSON в Postgres поможет создать индекс для полей JSON и т. Д.Можно ли использовать JSONConverter вместе с JDBC Sink Converter для хранения записей с типом столбца JSON.

Ответы [ 2 ]

0 голосов
/ 28 января 2019

JDBC Sink Connector не поддерживает PostgreSQL json, jsonb типы.Он поддерживает несколько примитивных типов datetime.

На следующей странице вы можете найти типы схем отображения для типов баз данных (PostgreSQL) https://docs.confluent.io/5.1.0/connect/kafka-connect-jdbc/sink-connector/index.html

Хотя, JDBC Источник Соединительв некоторых частях поддерживаются типы json, jsonb - столбцы такого типа не будут сопоставлены с STRUCT, но будут сопоставлены с типом STRING.

0 голосов
/ 24 января 2019

Используйте value.converter.schema.enable=true и отправляйте данные JSON следующим образом (со схемой как частью каждого сообщения и обновляйте раздел payload фактическими данными сообщения), и он должен работать с приемником JDBC.

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": false,
            "field": "id"
        }, {
            "type": "struct",
            "name": "data",
            "optional": false,
            "fields": [{
               "type": "string",
               "name": "hello",
               "optional":false
            }]
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "id": 10000,
        "data": {"hello":"world"}
    }
}

Или вы можете обратить внимание клиентов на использование Avro и сэкономить некоторую пропускную способность сети.

...