Фильтр Logstash для анализа журналов Python Cellery как JSON - PullRequest
0 голосов
/ 23 октября 2018

У меня есть приложение для cellery, назовем его родовым myapp.Журналы настроены в формате JSON.Мы получаем некоторые записи от некоторых операторов, с которыми мы работаем, и, к сожалению, эти записи, фактически конкретная запись не в формате JSON.Я пытаюсь создать фильтр logstash, чтобы я мог импортировать эти журналы вasticsearch, в определенный индекс, называемый myapp.

Выдержка из журнала:

"rpc_method": "get_comm_details", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:24.165372Z"}
{"event": "Task succeeded", "id": "890a084e-5ff8-4d3f-aedb-2a8688bb62ff", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "user", "rpc_method": "get_comm_details", "rpc_params": [52683], "runtime": 0.3010744289495051, "service": "core", "timestamp": "2018-10-16T20:23:24.466367Z"}


{"event": "Received task", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:25.307046Z"}

{"event": "Task accepted", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "pid": 140559562601968, "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:25.308901Z"}

{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}

{"event": "Received task", "id": "23fb661f-a256-488a-bf22-3680fe4d4f32", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.SilentRPCTask", "rpc_endpoint": "analytics.fact", "rpc_method": "add", "rpc_params": {"city_id": null, "correlation_id": "6ad37ee2-e4a8-46ab-9dad-fb3d16918fcc", "country_id": null, "dst_is_platform": true, "dst_ln": 21580012, "dst_mid": null, "dst_operator_id": null, "dst_package_id": null, "dst_user_id": null, "duration": null, "ignore_errors": true, "medium": null, "platform_event": "sms-in", "service": "wallet", "service_event": null, "service_path": null, "sms_in_size": 1, "sms_out_size": null, "src_is_platform": false, "src_ln": 647234242, "src_mid": null, "src_operator_id": 300046, "src_package_id": null, "src_user_id": 313676, "ts_end": null, "ts_start": null}, "timestamp": "2018-10-16T20:23:27.739982Z"}

Журнал вышеправильно индексируется вasticsearch как JSON.К сожалению, в журнале также есть строки, которые выглядят так:

{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}
{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [[77621]], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}

, которые не в формате JSON.Какой тип фильтра я должен использовать для преобразования только таких записей, как: "rpc_params": [value] или "rpc_params": [[value]] в JSON илистрока?Проблема заключается в том, что в журнале появляется точно такой же rpc_params, что и в JSON, но в некоторых других случаях он представляется в виде массива.

Журналы отправляются на эластичный поиск с использованием filebeat, и они сначала проходят через logstash.Мой фильтр logstash выглядит так:

filter {
  if "myapp" in [tags] {
    json {
      source => "message"
    }
    }
  }
}

Более того, в журнале также есть записи, которые выглядят так: "rpc_params": [[12312312, 12313123]]

На мой взгляд, лучше всего было бы создать фильтр, который преобразует [] и [[]] в "

Я пытался использовать фильтр мутаций, даже фильтр gsub, некоторые кодеки ruby, но, похоже, ничего не работает или, возможно, я делаю это неправильно.

Любая помощь будет очень признательна.

...