kafka connect Transform base64decode поле - PullRequest
0 голосов
/ 30 марта 2020

Я использую исходный соединитель kafka RMQ для извлечения данных из очереди RMQ. Одно из полей имеет кодировку base64 и имеет структуру json. Я использую Извлечь преобразование, чтобы извлечь это поле, но я не уверен, как декодировать это поле, я попытался написать свой собственный smt для декодирования этого поля, но я получаю ошибку java.lang.ClassCastException: [B cannot be cast to java.lang.String, когда я декодирую это поле и пытаюсь поместите его в обновленное значение записи, потому что сообщение является Json структурой. Мы очень ценим вашу помощь.

Ниже приведена конфигурация моего коннектора.

{
    "name" : "RabbitMQ_Source",
    "connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "RMQ_Topic",
    "rabbitmq.queue" : "rmqqueue",
    "rabbitmq.username":"username",
    "rabbitmq.virtual.host":"dummy",
    "rabbitmq.password":"password",
    "rabbitmq.host":"x.x.x.x",
    "rabbitmq.port":"5674",
    "transforms": "ExtractField",
    "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractField.field":"body"
  }

Ниже приведено сообщение в очереди, которое я пытаюсь использовать, Body является основным полем, которое необходимо быть отправленным в Kafka topi c после расшифровки. Если я просто использую извлечение преобразования, то оно работает нормально, но я вижу только закодированное сообщение в kafka topi c.

{
   "consumerTag": "abcd",
   "envelope": {
      "deliveryTag": 1,
      "isRedeliver": false,
      "exchange": "rmqqueue",
      "routingKey": "rmqqueue"
   },
   "basicProperties": {
      "contentType": "text/plain",
      "contentEncoding": null,
      "headers": {},
      "deliveryMode": 2,
      "priority": 0,
      "correlationId": null,
      "replyTo": null,
      "expiration": null,
      "messageId": null,
      "timestamp": null,
      "type": null,
      "userId": null,
      "appId": null
   },
   "body": "eyJXSFMiOlt7IkNoYXJhY3RlciBTZXQiOiJVVEYtOCIsImFjdGlvbiI6InJld3JpdGUiLCJVcGRhdGUtRGF0ZS1UaW1lIjoiMjAyMC0wMy0yNSAwOTowMDowMjoxOSJ9XX0="
}

1 Ответ

0 голосов
/ 30 марта 2020

Вам нужно использовать ByteArrayConverter

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"

. Вот пример этого здесь .

...