Чтение сообщения JMS как json, а не текста с помощью коннектора источника Kafka ActiveMQ - PullRequest
1 голос
/ 06 августа 2020

Я использую Kafka Connect последние пару месяцев, и недавно я включил плагин источника ActiveMQ, чтобы читать некоторые сообщения JMS topi c, которые содержат файл json внутри, помещать их в kafka topi c, а затем создайте поток / таблицу в Ksqldb, которая использует в качестве столбцов некоторые ключи, которые имеет файл json. Дело в том, что плагин вставляет сообщение JMS в виде текста с двойными кавычками, поэтому оно не распознается должным образом в Ksqldb. Я пробовал разные вещи в конфигурации, чтобы исправить это, но пока ничего не помогло. Я также хочу использовать форматирование json, а не Avro в kafka connect (реестр схемы тоже не работает). В целях тестирования я также попытался отправить JMS-сообщения, указав содержимое заголовка как «application / json», и все равно не повезло.

Вот как выглядит мой плагин ActiveMQ

 "config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}

и вот как выглядит моя конфигурация подключения Kafka

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1


config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.13-2.5.0/plugins

Также вот пример того, как Kafka потребляет сообщения

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": "{\"widget\": {     \"debug\": \"on\",    \"window\": {        \"title\": \"Sample Konfabulator Widget\",        \"name\": \"main_window\",        \"width\": 500,        \"height\": 500    },    \"image\": {        \"src\": \"Images/Sun.png\",        \"name\": \"sun1\",        \"hOffset\": 250,        \"vOffset\": 250,        \"alignment\": \"center\"    },    \"text\": {        \"data\": \"Click Here\",        \"size\": 36,        \"style\": \"bold\",        \"name\": \"text1\",        \"hOffset\": 250,        \"vOffset\": 100,        \"alignment\": \"center\",        \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}

Если требуется какая-либо другая информация, пожалуйста, дайте мне знать Любая помощь будет буду очень признателен.

ОБНОВЛЕНИЕ: В конечном итоге лучшим решением было бы каким-то образом настроить коннектор на не , избегая кавычек в полезной нагрузке. Также, к сожалению, экранированные кавычки генерируются из самого activeMQ и не являются частью исходного сообщения

Таким образом, сообщение будет выглядеть так

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    }

}

1 Ответ

0 голосов
/ 06 августа 2020

Добро пожаловать, Elen1no1Yami!

Мне кажется, проблема в том, что поле text сообщения представляет собой строку, содержащую полезную нагрузку JSON, которая вас интересует, но эта полезная нагрузка имеет двойную - кавычки, экранированные символом \.

Я предполагаю, что данные в самом ActiveMQ не имеют символа \, но было бы хорошо, если бы вы могли прояснить это.

Подходы, которые я вижу для решения этой проблемы, заключаются в следующем:

  1. иметь возможность настроить коннектор так, чтобы НЕ экранировать кавычки в полезной нагрузке. Чтобы сообщение выглядело больше как:
{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    },
    ... etc
}
или каким-то образом попросите ksqlDB обработать сообщение, поскольку оно все еще имеет доступ к JSON в поле text.

Обобщает ли это то, что вы ищете? Если да, обновите свой вопрос, чтобы отразить это. (Хорошо включать такие детали в свой вопрос, чтобы было понятно, о чем вы спрашиваете.

Что касается ответа ...

  1. Я не соединяюсь эксперт, поэтому не могу комментировать и не вижу ничего в деталях конфигурации коннектора , которые могут позволить вам изменить содержимое text. Другие, кто знает больше о Connect, могут чтобы помочь больше.

  2. Чтобы получить доступ к встроенному / экранированному JSON в ksqlDB, вам сначала нужно удалить экранирование. Способы сделать это с помощью ksqlDB см. ниже

Использование ksqlDB для доступа к экранированному JSON

Прежде чем мы сможем получить доступ к документу JSON в text, мы должны удалить экранирование.

I Я могу придумать два способа в верхней части моей головы:

Написать собственный UDF

Лучшим способом было бы написать собственный UDF 'unescape_json`, который мог бы удалить экранирование.

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
  SELECT MY_CUSTOM_UDF(message) FROM RAW;

Если написано правильно, пользовательский подход UDF не будет страдают от потенциальных проблем с повреждением данных, от которых страдает решение на основе REPLACE.

Использование REPLACE для удаления экранирования

ПРИМЕЧАНИЕ : это решение хрупкое: символ замена может соответствовать и заменять то, чего не должно быть, в зависимости от содержания вашего сообщения!

Давайте поработаем с более простыми тестовыми данными, чтобы объяснить, что необходимо, например, мы хотим преобразовать:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": 10}"
}

Кому:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": 10}
}

Для этого необходимы три вещи:

  1. Заменить отверстие "text": "{ на "text": {
  2. Заменить все \" на ".
  3. Заменить закрытие }" на }

Мы можем использовать для этого функцию REPLACE или REGEXP_REPLACE function:

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS 
  SELECT 
    REPLACE(
      REPLACE(
        REPLACE(message, 
          '"text": "{', '"text": {'), 
          '\"', '"'), 
          '"}', '}')
  FROM RAW;

Конечно, это решение может потенциально повредить ваши данные, если они содержат какие-либо условия поиска: "text": "{, \" или "} где-либо еще в ваших данных, например

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": \"hello \\\"} world\"}"
}

Неправильно преобразовано в

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": "hello \\}world"}
}

Th Вот почему предпочтительнее использовать пользовательский UDF.

После того, как вы исправили содержимое введенных вами данных (и записали его в новый topi c), вы можете импортировать свои данные как обычно:

CREATE STREAM DATA (
   messageId STRING,
   text STRUCT<Widget INT>
 ) WITH (
   kafka_topic='JSONIFIED',
   value_format='JSON'
 );
...