Экранированный JSON недействителен JSON, что, вероятно, усложнило бы его :)
В этом фрагменте:
…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…
ведущий двойной цитата для o_name
не экранирована. Вы можете проверить это с помощью чего-то вроде jq
:
echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685
С исправленным JSON это затем будет успешно проанализировано:
➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
"server_name": "xxxxxxxxxxxxxxx",
"remote_address": "10.x.x.x",
"user": "xxxxxx",
"timestamp_start": "xxxxxxxx",
"timestamp_finish": "xxxxxxxxxx",
"time_start": "10/Mar/2020:07:10:39 +0000",
"time_finish": "10/Mar/2020:07:10:39 +0000",
"request_method": "PUT",
"request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
"protocol": "HTTP/1.1",
"status": 200,
…
Так что теперь давайте перенесем это в ksqlDB , Я использую kafkacat для загрузки его в топи c:
kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF
Теперь с помощью ksqlDB давайте объявим схему схемы, в которой поле message
представляет собой просто комок VARCHAR
:
CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
INPUT_TYPE VARCHAR,
MESSAGE VARCHAR,
OFFSET BIGINT,
SOURCE VARCHAR)
WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');
Мы можем запросить этот поток, чтобы убедиться, что он работает:
SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME,
BEAT->VERSION,
SOURCE,
MESSAGE
FROM TEST
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME |BEAT__VERSION |SOURCE |MESSAGE |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx |5.2.1 |/var/log/xxxx.log |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
| | | |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
| | | |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
| | | |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
| | | |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
| | | |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
| | | |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
| | | |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
| | | |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
| | | |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
| | | |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
| | | |"format":1} |
Limit Reached
Query terminated
Теперь давайте извлечем встроенные поля JSON, используя функцию EXTRACTJSONFIELD
не все поля, просто несколько из них, чтобы проиллюстрировать образец для подражания):
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS |TIME_START |PROTOCOL |STATUS |STAT_C_WAIT |STAT_S_WAIT |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x |10/Mar/2020:07:10:39 +0000|HTTP/1.1 |200 |0.004 |0.432 |0 |31.878 |0 |32.319 |
Мы можем сохранить это для новой топики Кафки c, и для хорошей меры повторно выполнить ее до Avro, чтобы облегчить использование нижестоящих приложений:
CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;
Name : BEATS
Field | Type
---------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
REMOTE_ADDRESS | VARCHAR(STRING)
TIME_START | VARCHAR(STRING)
PROTOCOL | VARCHAR(STRING)
STATUS | VARCHAR(STRING)
STAT_C_WAIT | VARCHAR(STRING)
STAT_S_WAIT | VARCHAR(STRING)
STAT_DIGEST | VARCHAR(STRING)
STAT_COMMIT | VARCHAR(STRING)
STAT_TURN_AROUND_TIME | VARCHAR(STRING)
STAT_T_TRANSFER | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Для устранения проблем с ksqlDB, возвращающими значения NULL, ознакомьтесь с этой статьей . Чаще всего это связано с ошибками сериализации. Например, если вы посмотрите журнал сервера ksqlDB, вы увидите эту ошибку, когда он попытается проанализировать неправильно сформированный экранированный символ JSON до того, как я его исправлю:
WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
line: 1, column: 827]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…