Как преобразовать кинезис в json с помощью функции aws -lambda в python3? - PullRequest
0 голосов
/ 20 апреля 2020

Итак, я попытался прочитать журналы потоков Kinesis с помощью лямбда-функции и массово опубликовать их в Elasticsearch:

lambda.py:

(...) #iterating
    payload = payload + "{'index': {'_type': '" + doc_type + "','_index': '" + index_prefix
    payload = payload + "'}}\n" + str(document) + "\n"

    countProcessedLogs += 1
  count += 1
responseString = 'Processed ' + str(count) + ' items from the batch resulting into ' + str(countProcessedLogs) + ' processed logs.'
return payload, responseString


def handler(event, context):
    url = host + '/_bulk'
    try:
        jsonDocument, response = handlerLogic(event)
        jsonDocumentSize = len(jsonDocument.encode('utf-8'))

        if jsonDocumentSize > 0:
            r = requests.post(url, auth=awsauth, headers=headers, data=jsonDocument.replace("'", '"'))
            print(r.request.body)
(...)

И я успешно получил их в переменной jsonDocument , Затем я отправился проверить эластичный поиск и убедился, что туда попадают не все журналы. Поэтому я решил напечатать тело запроса, что застало меня врасплох, потому что некоторые журналы имеют правильный формат json, а другие нет, но я не могу понять, почему. Как вы можете заметить, я уже пытался реализовать некоторые замены ("" для "").

Ниже приведен фрагмент тела запроса:

{
    "index": {
        "_type": "logs",
        "_index": "index"
    }
}
\n{"logStream": "awslogs-x/x/123", "logGroup": "/v/r/cluster/default", "timestamp": "2020/04/20 15:32:21", "t": "v", "component": "v", "c": "v", "environment": "r", "message": "\\x1b[0m\\x1b[33m15:32:21,409 WARN  [org.jboss.as.domain.management.security] (MSC service thread 1-1) WFLYDM0111: Keystore /opt/jboss/k/standalone/configuration/application.keystore not found, it will be auto generated on first use with a self signed certificate for host localhost\\n", "type": "warn", "application": "k", "class_name": None, "event_type": "not_specified"}\n
{
    "index": {
        "_type": "logs",
        "_index": "index"
    }
}
\n{"logStream": "awslogs-x/x/123", "logGroup": "/v/r/cluster/default", "timestamp": "2020/04/20 15:32:21", "t": "v", "component": "v", "c": "v", "environment": "r", "message": "\\x1b[0m\\x1b[0m15:32:21,539 INFO  [org.jboss.as.server] (Controller Boot Thread) WFLYSRV0212: Resuming server\\n", "type": "info", "application": "k", "class_name": None, "event_type": "not_specified"}\n
{
    "index": {
        "_type": "logs",
        "_index": "index"
    }
}
\n
{
    "logStream": "awslogs-x/x/123",
    "logGroup": "/v/r/cluster/default",
    "timestamp": "2020/04/20 15:32:21",
    "t": "v",
    "component": "v",
    "c": "v",
    "environment": "r",
    "message": "\\x1b[0m\\x1b[0m15:32:21,541 INFO  [org.jboss.as] (Controller Boot Thread) WFLYSRV0025: K 4.5.0.Final (WildFly Core 5.0.0.Final) started in 2495ms - Started 65 of 84 services (30 services are lazy, passive or on-demand)\\n\\x1b[0mThe batch executed successfully\\n",
    "type": "info",
    "application": "k",
    "class_name": "[org.jboss.as] (controller boot thread) wflysrv0025: k 4.5.0.final (wildfly core 5.0.0.final) started in 2495ms",
    "event_type": "not_specified"
}
\n

Можете ли вы помогите понять, почему существуют какие-то «неформатированные» журналы?

...