Как извлечь определенные поля из входных данных JSON с помощью фильтров Logstash? - PullRequest
0 голосов
/ 20 марта 2019

В настоящее время я пытаюсь проанализировать строку JSON с помощью Logstash для отправки в базу данных Elastic Search. Входная строка выглядит следующим образом:

"message": """{"resourceId":"201987-20190320-05307201","body":{"orderNumber":"201987-20190320-05307201","wrapping1":{"name":null,"price":null,"taxFlag":null},"wrapping2":{"name":null,"price":null,"taxFlag":null},"package":{"608738":{"goodsTax":222,"postagePrice":0,"deliveryPrice":0,"sender":{"isOrderer":true,"familyName":"Smith",action":"CREATE","timestampMs":1553044764100}"""

Я пробую несколько разных методов для извлечения поля «resourceId» при удалении всех остальных, но они не увенчались успехом. Я получаю сообщение об ошибке:

Could not index event to Elasticsearch.
...
Limit of total fields [5000] in index [test2-rt_platform] has been exceeded

Мои настройки следующие:

input {
  kafka {
    # Target servers
    bootstrap_servers => "myMachine:9002"

    # Topic and consumer settings
    topics => ["myTopic"]
    group_id => "theConsumer"
    consumer_threads => 1

    decorate_events => true

    # Output format settings
    codec => json

    # Performance settings
    auto_commit_interval_ms => "10000"
    auto_offset_reset => "latest"
    request_timeout_ms => "7000"
    session_timeout_ms => "6000"
    heartbeat_interval_ms => "2000"
    poll_timeout_ms => 2000
    retry_backoff_ms => "1000"
    max_partition_fetch_bytes => "10485760"

  }
}

filter {
  mutate {
    add_field => {
      "[@metadata][index]" => "platform_test_%{[kafka][topic]}"
      "[@metadata][format]" => "%{[kafka][topic]}"
    }
  }
  json {
      source => "message"
  }

  mutate {
        add_field => {
            "orderNumber22" => "%{[body][orderNumber]}"
        }

        add_field => {
            "orderNumber33" => [ "[body][orderNumber]" ]
        }
        add_field => {
           "orderNumber44" => "%{[message][body][orderNumber]}"
        }

        add_field => {
            "orderNumber55" => [ "[message][body][orderNumber]" ]
        }
        remove_field => [ "[message]" ]
}


  fingerprint {
    method => "MD5"
    key => "%{[kafka][topic]}"
    target => "[@metadata][fingerprint]"
  }
}

output {
  elasticsearch {
    action => "index"
    flush_size => 100
    document_id => "%{[@metadata][fingerprint]}"
    document_type => "%{[@metadata][format]}"
    hosts => ["myMachine:9002"]
    index => "test2-rt_platform"
    retry_max_interval => 5
    timeout => 10000
    user => "myUser"
    password => "myPassword"
  }
  stdout {
    codec => rubydebug { metadata => true }
  }

}
...