MS SQL значение: имя столбца = опора -> значение = 100 и имя столбца = роль -> значение = [{"роль": "актер"}, {"роль": "директор"}] ПРИМЕЧАНИЕ: столбец: роль сохраняется в формате json.
прочитано из kafka topi c:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"prop"
},
{
"type":"string",
"optional":true,
"field":"roles"
}
],
"optional":false
},
"payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}
ошибка с указанием причины:
Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}
Причина сбоя в том, что соединитель не может создать схему как массив для ролей
Приведенное выше входное сообщение создается сливающимся JdbcSourceConnector и используемым соединителем приемника является конфлюентным ElasticsearchSinkConnector
Детали конфигурации:
Конфигурация приемника:
name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>
tasks.max=1
topics=test_prop
type.name=prop
#transforms=InsertKey, ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop
Конфигурация источника:
name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop
connect- standalone.properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
необходимо понять, насколько явно я могу сделать схему как ARRAY для ролей, а не строку.