массовое обновление данных из MS SQL vis kafka connector в elasticsearch с вложенными типами не работает - PullRequest
0 голосов

MS SQL значение: имя столбца = опора -> значение = 100 и имя столбца = роль -> значение = [{"роль": "актер"}, {"роль": "директор"}] ПРИМЕЧАНИЕ: столбец: роль сохраняется в формате json.

прочитано из kafka topi c:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"prop"
         },
         {
            "type":"string",
            "optional":true,
            "field":"roles"
         }
 ],
      "optional":false
   },
   "payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}

ошибка с указанием причины:

Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}

Причина сбоя в том, что соединитель не может создать схему как массив для ролей

Приведенное выше входное сообщение создается сливающимся JdbcSourceConnector и используемым соединителем приемника является конфлюентным ElasticsearchSinkConnector

Детали конфигурации:

Конфигурация приемника:

name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>

tasks.max=1

topics=test_prop
type.name=prop

#transforms=InsertKey, ExtractId

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop

transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop

Конфигурация источника:

name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop

transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop

connect- standalone.properties:

    bootstrap.servers=localhost:9092

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

необходимо понять, насколько явно я могу сделать схему как ARRAY для ролей, а не строку.

1 Ответ

0 голосов

Тот факт, что коннектор jdb c Source всегда будет видеть имя столбца как имя поля и значение столбца как значение. Это преобразование из строки в массив невозможно с существующей поддержкой исходного коннектора jdb c, для его включения должно быть настраиваемое преобразование или настраиваемый плагин.

Лучший вариант, доступный для получения данных, передаваемых из MS SQL и вставка его в поиск Elasti c осуществляется с помощью logsta sh. Он имеет богатые плагины фильтров, которые позволяют передавать данные из MS SQL в любом желаемом формате и в любую желаемую среду вывода JSON (logstash / kafka topi c)

Flow: MS SQL -> logsta sh -> Kakfa Topi c -> Kafka Elasti c коннектор раковины -> Elasti c Искать

...