Прочтите CSV на уровне Logsta sh и отфильтруйте на основе извлеченных данных - PullRequest
0 голосов
/ 17 июня 2020

Я использую Metricbeat для получения данных на уровне процесса и sh их в Elasti c Поиск с использованием Logsta sh.

Теперь цель состоит в том, чтобы разделить процессы на 2 тега, т.е. запущенный процесс - это либо браузер, либо что-то еще.

Я могу сделать это статически, используя этот блок кода:


input {
  beats {
    port => 5044
  }
}
filter{
    if [process][name]=="firefox.exe" or [process][name]=="chrome.exe" {
        mutate {
            add_field => { "process.type" => "browsers" }
            convert => {
            "process.type" => "string"
            }
        }
    }
    else {
        mutate {
            add_field => { "process.type" => "other" }
        } 
    }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    # manage_template => false
    index => "metricbeatlogstash"
  }
}

Но когда я пытаюсь сделать это условие if Dynami c, прочитав список процессов из CSV, я не получаю никаких достоверных результатов в Kibana или ошибки на моем уровне LogSta sh.

Код файла конфигурации CSV выглядит следующим образом:

input {
  beats {
    port => 5044
  }
  file{
        path=>"filePath"
        start_position=>"beginning"
        sincedb_path=>"NULL"
    }
}
filter{
    csv{
        separator=>","
        columns=>["processList","IT"]
    }
    if [process][name] in [processList] {
        mutate {
            add_field => { "process.type" => "browsers" }
            convert => {
            "process.type" => "string"
            }
        }
    }
    else {
        mutate {
            add_field => { "process.type" => "other" }
        } 
    }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    # manage_template => false
    index => "metricbeatlogstash2"
  }
}

1 Ответ

1 голос
/ 18 июня 2020

То, что вы пытаетесь сделать, не работает таким образом в logsta sh, события в конвейере logsta sh независимы друг от друга.

События, полученные вашим beats вводом не знаете о событиях, полученных вашим csv входом, поэтому вы не можете использовать поля из разных событий в условном выражении.

Чтобы делать то, что вы хотите, вы можете использовать фильтр translate со следующим config.

translate {
    field => "[process][name]"
    destination => "[process][type]"
    dictionary_path => "process.csv"
    fallback => "others"
    refresh_interval => 300
}

Этот фильтр будет проверять значение поля [process][name] по словарю, загруженному в память из файла process.csv, словарь - это файл .csv с двумя столбцами, первый - это имя процесса браузера, а второй всегда browser.

chrome.exe,browser
firefox.exe,browser

Если фильтр нашел совпадение, он заполнит поле [process][type] (не process.type) значением из второго столбца, в данном случае всегда browser, если совпадений нет, поле [process][type] заполняется значением конфигурации fallback, в данном случае others, оно также перезагружается содержимое process.csv файл каждые 300 секунд (5 минут)

...