Отправка gzip-журналов Cloudtrail из S3 в ElasticSearch - PullRequest
0 голосов
/ 03 февраля 2019

Я относительно новичок во всей части установки ELK, поэтому, пожалуйста, продолжайте.

То, что я хочу сделать, это отправить журналы Cloudtrail, которые хранятся на S3, в локально (не в AWS, я имею в виду) настройку ELK.Я не использую Filebeat нигде в настройке.Я считаю, что это не обязательно использовать.Logstash может напрямую доставлять данные в ES.

  1. Я здесь?

Как только данные будут в ES, я бы просто хотел визуализировать их в Кибане.

То, что я пробовал до сих пор, учитывая, что мой ELK запущен и что в настройке не задействован Filebeat:

с использованием плагина S3 logstash

содержимое / etc / logstash / conf.d / aws_ct_s3.conf

input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}

output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}

Когда logstash запущен с указанным выше conf, я вижу, что все работает нормально.Используя плагин head для Google Chrome, я вижу, что документы постоянно добавляются в указанный индекс. Фактически, когда я просматриваю его, я вижу, что есть данные, которые мне нужны.Я могу видеть то же самое на стороне Кибаны.

Данные каждого из этих файлов gzip имеют формат:

{
  "Records": [
    dictionary_D1,
    dictionary_D2,
    .
    .
    .
  ]
}

И я хочу, чтобы каждый из этих словарей из списка словарей выше был отдельным событием вKibana.Когда я немного погуглю, я понимаю, что могу использовать фильтр split, чтобы добиться того, чего хочу.И теперь мой aws_ct_s3.conf выглядит примерно так:

input {
s3 {
access_key_id => "access_key_id"
bucket => "bucket_name_here"
secret_access_key => "secret_access_key"
prefix => "AWSLogs/<account_number>/CloudTrail/ap-southeast-1/2019/01/09"
sincedb_path => "/tmp/s3ctlogs.sincedb"
region => "us-east-2"
codec => "json"
add_field => { source => gzfiles }
}
}

filter {
split {
   field => "Records"
 }
}

output {
stdout { codec => json }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "attack-%{+YYYY.MM.dd}"
}
}

И с этим я фактически получаю данные, которые мне нужны, на Кибане.

Теперь проблема заключается в

Без установленного фильтра количество документов, отправляемых Logstash из S3 в Elasticsearch, было в ГБ, а после примененияФильтр остановился только на примерно 5000 документов.

Я не знаю, что я здесь делаю не так.Может кто-нибудь помочь, пожалуйста?

Текущая конфигурация:

java -XshowSettings:vm => Max Heap Size: 8.9 GB

elasticsearch jvm options => max and min heap size: 6GB

logstash jvm options => max and min heap size: 2GB

ES version - 6.6.0

LS version - 6.6.0

Kibana version - 6.6.0

Так выглядит текущее использование кучи: enter image description here

...