Как выбрать подзначение json в topi c как поток k sql - PullRequest
0 голосов
/ 11 марта 2020

У меня много тем в kafka с таким форматом: value: {big json string with many subkeys etc}.

print topi c выглядит так:

время строки: 3/10/20 7:10 : 43 AM UT C, ключ:, значение: {"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2 .1 "}," input_type ":" log "," log_d c ":" xxxxxxxxxxx "," message ":" {\ "имя_сервера \": \ "xxxxxxxxxxxxxxx \", \ "remote_address \": \ " 10.xxx \ "\ "пользователь \": \ "хххххх \", \ "timestamp_start \": \ "XXXXXXXX \", \ "timestamp_finish \": \ "XXXXXXXXXX \", \ "TIME_START \": \" 10 марта / 2020: 07: 10: 39 +0000 \ ", \" time_finish \ ": \" 10 / Mar / 2020: 07: 10: 39 +0000 \ ", \" request_method \ ": \" PUT \ "\ "REQUEST_URI \": \ "xxxxxxxxxxxxxxxxxxxxxxx \", \ "протокол \": \ "HTTP / 1.1 \", \ "статус \": 200, \ "response_length \": \ "0 \", \" request_length \ ": \" 0 \ "\ "user_agent \": \ "XXXXXXXXX \", \ "request_id \": \ "zzzzzzzzzzzzzzzzzzzzz \", \ "request_type \": \ "zzzzzzzz \", \" стат \ ": {\" c_wait \ ": 0,004, \" s_wait \ ": 0,432, \" переварить \ ": 0.0, \" совершить \ ": 31.878, \" turn_around_time \ ": 0.0, \" t_transfer \ ": 32,319 }, \ "object_length \": \» 0 \», "o_name \": \ "ххххх \", \ "HTTPS \": {\ "протокол \": \ "TLSv1.2 \", \ "cipher_suite \": \ "TLS_RSA_WITH_AES_256_GCM_SHA384 \"}, \ "принципы \": {\ "личность \": \ "Zzzzzz \", \ "ASV \": \ "DDDDDDDDDD \"}, \ "тип \": \ "HTTP \", \ "формат \": 1 } "," offset ": 70827770," source ":" /var/log/xxxx.log "," type ":" topicname "}

Я пытался использовать

CREATE STREAM test
 (value STRUCT<
    server_name VARCHAR,
    remote_address VARCHAR,
    forwarded_for VARCHAR,
    remote_user VARCHAR,
    timestamp_start VARCHAR
 ..

WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

Но я получаю поток со значением NULL. Есть ли способ получить ключ значения?

1 Ответ

0 голосов
/ 12 марта 2020

Экранированный JSON недействителен JSON, что, вероятно, усложнило бы его :)

В этом фрагменте:

…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…

ведущий двойной цитата для o_name не экранирована. Вы можете проверить это с помощью чего-то вроде jq:

echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685

С исправленным JSON это затем будет успешно проанализировано:

➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
  "server_name": "xxxxxxxxxxxxxxx",
  "remote_address": "10.x.x.x",
  "user": "xxxxxx",
  "timestamp_start": "xxxxxxxx",
  "timestamp_finish": "xxxxxxxxxx",
  "time_start": "10/Mar/2020:07:10:39 +0000",
  "time_finish": "10/Mar/2020:07:10:39 +0000",
  "request_method": "PUT",
  "request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
  "protocol": "HTTP/1.1",
  "status": 200,
…

Так что теперь давайте перенесем это в ksqlDB , Я использую kafkacat для загрузки его в топи c:

kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF

Теперь с помощью ksqlDB давайте объявим схему схемы, в которой поле message представляет собой просто комок VARCHAR:

CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
                    INPUT_TYPE VARCHAR, 
                    MESSAGE VARCHAR, 
                    OFFSET BIGINT, 
                    SOURCE VARCHAR) 
            WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

Мы можем запросить этот поток, чтобы убедиться, что он работает:

SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME, 
       BEAT->VERSION, 
       SOURCE, 
       MESSAGE 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME   |BEAT__VERSION  |SOURCE              |MESSAGE                                                             |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx       |5.2.1          |/var/log/xxxx.log   |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
|                 |               |                    |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
|                 |               |                    |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
|                 |               |                    |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
|                 |               |                    |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
|                 |               |                    |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
|                 |               |                    |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
|                 |               |                    |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
|                 |               |                    |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
|                 |               |                    |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
|                 |               |                    |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
|                 |               |                    |"format":1}                                                         |
Limit Reached
Query terminated

Теперь давайте извлечем встроенные поля JSON, используя функцию EXTRACTJSONFIELD не все поля, просто несколько из них, чтобы проиллюстрировать образец для подражания):

SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
       EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
       EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
       EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS  |TIME_START                |PROTOCOL  |STATUS  |STAT_C_WAIT |STAT_S_WAIT  |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x        |10/Mar/2020:07:10:39 +0000|HTTP/1.1  |200     |0.004       |0.432        |0           |31.878      |0                     |32.319          |

Мы можем сохранить это для новой топики Кафки c, и для хорошей меры повторно выполнить ее до Avro, чтобы облегчить использование нижестоящих приложений:

CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
    SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
        EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
        EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
        EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
    FROM TEST 
    EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;

Name                 : BEATS
 Field                 | Type
---------------------------------------------------
 ROWTIME               | BIGINT           (system)
 ROWKEY                | VARCHAR(STRING)  (system)
 REMOTE_ADDRESS        | VARCHAR(STRING)
 TIME_START            | VARCHAR(STRING)
 PROTOCOL              | VARCHAR(STRING)
 STATUS                | VARCHAR(STRING)
 STAT_C_WAIT           | VARCHAR(STRING)
 STAT_S_WAIT           | VARCHAR(STRING)
 STAT_DIGEST           | VARCHAR(STRING)
 STAT_COMMIT           | VARCHAR(STRING)
 STAT_TURN_AROUND_TIME | VARCHAR(STRING)
 STAT_T_TRANSFER       | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Для устранения проблем с ksqlDB, возвращающими значения NULL, ознакомьтесь с этой статьей . Чаще всего это связано с ошибками сериализации. Например, если вы посмотрите журнал сервера ksqlDB, вы увидите эту ошибку, когда он попытается проанализировать неправильно сформированный экранированный символ JSON до того, как я его исправлю:

WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
 line: 1, column: 827]
   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…
...