Я использую стек ELK. Файлы журнала сохраняются каждые 5 минут с помощью простого приложения Java. Затем Filebeat бросает их в Logsta sh. Из-за перезаписи одинаковые сообщения индексируются (их отпечатки идентичны). Единственным отличием является идентификатор документа. Elasticsearch дает новый идентификатор для документов каждый раз, когда они перезаписываются. Как избавиться от дубликатов или сохранить идентификатор документа таким же?
Logsta sh ввод:
input {
beats {
port => 5044
ssl => false
ssl_certificate => "/etc/pki/tls/certs/logstash-beats.crt"
client_inactivity_timeout => 200
ssl_key => "/etc/pki/tls/private/logstash-beats.key"
}
}
filter {
if [fields][log_type] == "access" {
grok {
match => [ "message", "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{WORD:servername}/%{NOTSPACE:requestpage} HTTP/%{NUMBER:http_version}\" %{NUMBER:server_response} %{NUMBER:answer_size}" ]
}
}
else if [fields][log_type] == "errors" {
grok {
match => {"message" => "%{DATESTAMP:maximotime}(.*)SystemErr"}
}
date {
timezone => "Europe/Moscow"
match => ["maximotime", "dd.MM.yy HH:mm:ss:SSS"]
}
mutate {
copy => { "message" => "key" }
}
mutate {
gsub => [
"message", ".*SystemErr R ", "",
"key", ".*SystemErr R", ""
]
}
truncate {
fields => "key"
length_bytes => 255
}
fingerprint {
method => "SHA1"
source => ["key"]
}
if "_grokparsefailure" in [tags] {
drop { }
}
} else if [fields][log_type] == "info" {
grok {
match => {"message" => ["%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \(выполнение заняло %{NUMBER:execution} миллисекунд\) \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \(выполнение заняло %{NUMBER:execution} миллисекунд\)"]}
add_field => {
"type" => "conditions"
}
}
mutate {
convert => {
"execution" => "integer"
}
}
fingerprint {
method => "SHA1"
source => ["message"]
}
if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "%{TIMESTAMP_ISO8601:maximotime} (.*)getMboCount %{WORD:object}: mbosets \(%{WORD:mbosets}\), mbos \(%{WORD:mbos}\)"}
add_field => {
"type" => "maximoObjectCount"
}
remove_tag => ["_grokparsefailure"]
}
mutate {
convert => {
"mbosets" => "integer"
"mbos" => "integer"
}
}
fingerprint {
method => "SHA1"
source => ["message"]
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
date {
timezone => "Europe/Moscow"
match => ["maximotime", "yyyy-MM-dd HH:mm:ss:SSS"]
target => "maximotime"
}
}
}
Logsta sh вывод:
output {
stdout {codec => rubydebug}
if [fields][log_type] == "access" {
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
} else if [fields][log_type] == "errors"{
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-error-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
} else if [fields][log_type] == "info"{
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-info-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
document_id => "%{fingerprint}"
}
}
}
Filebeat.yml:
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
processors:
- add_cloud_metadata: ~
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/integration/*.log
fields: {log_type: access}
- type: log
enabled: true
paths:
- /var/log/maximo_error_logs/*.log
fields: {log_type: errors}
exclude_lines: '^((\*+)|Log file started at:)'
multiline.pattern: '(^$|(\t|\s)at .*|.*Caused by:.*|.*SystemErr( ){5}R[ \t]{2}at .*|^ru.ocrv..*|^(\s|\t|)null.*|Обратитесь за.*|.*Закрытое со.*|^(\s|\t|)(ORA-.*|BMX.*)|^(\\s|\t)[А-Яа-я].*)|(.*\d more$)'
multiline.negate: false
multiline.match: after
- type: log
enabled: true
paths:
- /var/log/maximo_logs/*.log
fields: {log_type: info}
output.logstash:
hosts: ["elk:5044"]
bulk_max_size: 200