Logstash: отправить событие в другое место, если вывод не удался - PullRequest
0 голосов
/ 25 мая 2018

Предоставление следующего конвейера logstash:

input
{
    generator
    {
        lines => [
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"hello": "world"}',
        '{"name" :"wrong data", "data" : "I am wrong !"}',
        '{"name" :"wrong data", "data" : { "hello" : "world" }}'
        ]
        codec => json
        count => 1
    }
}

filter
{
  mutate
  {
    remove_field => ["sequence", "host", "@version"]
  }
}

output
{
   elasticsearch
   {
     hosts => ["elasticsearch:9200"]
     index => "events-dev6-test"
     document_type => "_doc"
     manage_template => false
   }

   stdout
   {
       codec => rubydebug
   }
}

asticsearch имеет строгое отображение для этого индекса, следовательно, некоторые события дают ошибку 400 "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed" (что нормально).

Как отправить сбойные события в другое место (текстовые журналы или другой индекс эластичного поиска) (чтобы я не терял события)?

1 Ответ

0 голосов
/ 25 мая 2018

Введено Logstash 6.2 Очереди недоставленных сообщений , которые можно использовать, чтобы делать то, что вы хотите.Вам нужно будет включить dead_letter_queue.enable: true в вашем logstash.yml.

, а затем просто работать с ним как с входом:

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue" 
    commit_offsets => true 
    pipeline_id => "main" 
  }
}

output {
  file {
    path => ...
       codec => line { format => "%{message}"}
   }    
}

До 6.2 я не верюбыл способ сделать то, что вы хотите.

...