Динамическая установка индекса fluentD elasti c -search - PullRequest
0 голосов
/ 27 мая 2020

Я пытаюсь переслать журналы в elasti c -search и застрял с динамической установкой индекса (по полю во входных данных).

Мой формат входных данных - JSON и всегда есть ключ "es_idx". Я использую sh для перенаправления в elasticsearch по этому ключу и добавления его отметки времени, я использую logstash_format true для достижения функции отметки времени и logstash_prefix для установки имени индекса, отличного от «fluentd»

Это как выглядит моя конфигурация fluentd:

# fluentd/conf/fluent.conf
<source>
  type stdin
  # Input pattern. It depends on Parser plugin
  format json

  # Optional. default is stdin.events
</source>

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    host <es-host>
    port<es-port>
    logstash_format true
    logstash_prefix ${$record["es_idx"]}
    type_name fluentd
    flush_interval 5s
  </store>

</match>

При использовании следующего ввода {"tenant_id": "test", "es_idx": "blabla"} я получаю следующую ошибку:

2020-05-27 10:38:06 +0300 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="stdin.events" time=2020-05-27 10:37:59.498450000 +0300 record={"tenant_id"=>"test", "es_idx"=>"blabla"}

Если я устанавливаю logstash_pattern на другую строку вроде этой: «logstash_pattern blabla», она работает нормально.

Кто-нибудь знает, в чем может быть проблема?

Ответы [ 2 ]

1 голос
/ 05 июня 2020

Для использования Dynami c elasti c -search вам необходимо использовать ключи Chunk, как описано здесь В вашем случае вам могут понадобиться такие конфигурации

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    host <es-host>
    port<es-port>
    logstash_format true
    logstash_prefix ${es_idx}
    logstash_dateformat %Y%m%d
    type_name fluentd
    flush_interval 5s

    <buffer es_idx>
      @type file
      path /fluentd/log/elastic-buffer
      flush_thread_count 8
      flush_interval 1s
      chunk_limit_size 32M
      queue_limit_length 4
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </store>
</match>

другой вариант использовать elasticsearch_dynami c

<match my.logs.*>
  @type elasticsearch_dynamic
  hosts ${record['host1']}:9200,${record['host2']}:9200
  index_name my_index.${Time.at(time).getutc.strftime(@logstash_dateformat)}
  logstash_prefix ${tag_parts[3]}
  port ${9200+rand(4)}
  index_name ${tag_parts[2]}-${Time.at(time).getutc.strftime(@logstash_dateformat)}
</match>
0 голосов
/ 08 июня 2020

удалось получить значение из объекта записи, например:

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    @log_level debug
    host <host>
    logstash_format true
    logstash_prefix ${es_index_pattern}
    type_name fluentd
    flush_interval 5s
    <buffer tag, es_index_pattern>
      @type memory
    </buffer>
  </store>
</match>
...