Невозможно десериализовать экземпляр `java.lang.String` на сервере node.js для подключения к kafka - PullRequest
0 голосов
/ 22 сентября 2019

Я использую curl для отправки нашей службе Kafka Connect сообщения запроса JSON с информацией о соединителе, он работает успешно.

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'

сейчас я использую сервер node.js для отправки данных в kafkaподключите сервер.

  var body = {
  "name": "abc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
};

  var options = {
      method: 'PUT',
      uri: 'http://localhost/connectors/abc/config',
      headers: {
          'User-Agent': 'Request-Promise'
      },
      json: true ,
      body: body
  };

  rp(options)
      .then(function (data) {
          return res.status(200).json({ 'data': data});
      })
      .catch(function (err) {
        console.log(err);
          return res.status(500).json({ error: err});
      });

однако код выдает ошибку: поговорка

{ StatusCodeError: 500 - {"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT token\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 42] (through reference chain: java.util.LinkedHashMap[\"config\"])"}

Описание API от https://docs.confluent.io/current/connect/references/restapi.html

enter image description here

1 Ответ

1 голос
/ 22 сентября 2019

Если я правильно прочитал конфлюентный документ, вы перепутали две разные конечные точки API.

В своем коде вы используете конечную точку /connectors/abc/config, которая согласно документации принимает один объект конфигурации в качестве верхнего уровнявот так:

{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "tasks.max": "1",
  "database.hostname": "mysql",
  "database.port": "3306",
  "database.user": "debezium",
  "database.password": "dbz",
  "database.server.id": "184054",
  "database.server.name": "dbserver1",
  "database.whitelist": "inventory",
  "database.history.kafka.bootstrap.servers": "kafka:9092",
  "database.history.kafka.topic": "schema-changes.inventory"
}

Но ваш JSON-объект выглядит так, как будто он предназначен для конечной точки /connectors.

Изменение либо конечной точки, либо вашего JSON-объекта в соответствии с имеющейся у вас конечной точкойВыбранный может решить проблему.

...