Добро пожаловать, Elen1no1Yami!
Мне кажется, проблема в том, что поле text
сообщения представляет собой строку, содержащую полезную нагрузку JSON, которая вас интересует, но эта полезная нагрузка имеет двойную - кавычки, экранированные символом \
.
Я предполагаю, что данные в самом ActiveMQ не имеют символа \
, но было бы хорошо, если бы вы могли прояснить это.
Подходы, которые я вижу для решения этой проблемы, заключаются в следующем:
- иметь возможность настроить коннектор так, чтобы НЕ экранировать кавычки в полезной нагрузке. Чтобы сообщение выглядело больше как:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
},
... etc
}
или каким-то образом попросите ksqlDB обработать сообщение, поскольку оно все еще имеет доступ к JSON в поле
text
.
Обобщает ли это то, что вы ищете? Если да, обновите свой вопрос, чтобы отразить это. (Хорошо включать такие детали в свой вопрос, чтобы было понятно, о чем вы спрашиваете.
Что касается ответа ...
Я не соединяюсь эксперт, поэтому не могу комментировать и не вижу ничего в деталях конфигурации коннектора , которые могут позволить вам изменить содержимое text
. Другие, кто знает больше о Connect, могут чтобы помочь больше.
Чтобы получить доступ к встроенному / экранированному JSON в ksqlDB, вам сначала нужно удалить экранирование. Способы сделать это с помощью ksqlDB см. ниже
Использование ksqlDB для доступа к экранированному JSON
Прежде чем мы сможем получить доступ к документу JSON в text
, мы должны удалить экранирование.
I Я могу придумать два способа в верхней части моей головы:
Написать собственный UDF
Лучшим способом было бы написать собственный UDF 'unescape_json`, который мог бы удалить экранирование.
-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
message STRING
) WITH (
KAFKA_TOPIC=<something>,
VALUE_FORMAT='KAFKA'
);
-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
SELECT MY_CUSTOM_UDF(message) FROM RAW;
Если написано правильно, пользовательский подход UDF не будет страдают от потенциальных проблем с повреждением данных, от которых страдает решение на основе REPLACE
.
Использование REPLACE
для удаления экранирования
ПРИМЕЧАНИЕ : это решение хрупкое: символ замена может соответствовать и заменять то, чего не должно быть, в зависимости от содержания вашего сообщения!
Давайте поработаем с более простыми тестовыми данными, чтобы объяснить, что необходимо, например, мы хотим преобразовать:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": 10}"
}
Кому:
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {"widget": 10}
}
Для этого необходимы три вещи:
- Заменить отверстие
"text": "{
на "text": {
- Заменить все
\"
на "
. - Заменить закрытие
}"
на }
Мы можем использовать для этого функцию REPLACE или REGEXP_REPLACE function:
-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
message STRING
) WITH (
KAFKA_TOPIC=<something>,
VALUE_FORMAT='KAFKA'
);
-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS
SELECT
REPLACE(
REPLACE(
REPLACE(message,
'"text": "{', '"text": {'),
'\"', '"'),
'"}', '}')
FROM RAW;
Конечно, это решение может потенциально повредить ваши данные, если они содержат какие-либо условия поиска: "text": "{
, \"
или "}
где-либо еще в ваших данных, например
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": \"hello \\\"} world\"}"
}
Неправильно преобразовано в
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {"widget": "hello \\}world"}
}
Th Вот почему предпочтительнее использовать пользовательский UDF.
После того, как вы исправили содержимое введенных вами данных (и записали его в новый topi c), вы можете импортировать свои данные как обычно:
CREATE STREAM DATA (
messageId STRING,
text STRUCT<Widget INT>
) WITH (
kafka_topic='JSONIFIED',
value_format='JSON'
);