Фильтрация данных JDBC в Logstash - PullRequest
1 голос
/ 02 октября 2019

В моей БД у меня есть данные в следующем формате:

enter image description here

Но в ElasticSearch я хочу выдвинуть данные относительно элементатипы. Таким образом, каждая запись в ElasticSearch будет перечислять все имена элементов и их значения для каждого типа элементов.

Примерно так:

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "10",
  "_source": {
    "item_type: "10",
    "fruits": "20",
    "veggies": "32",
    "butter": "11",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "11",
  "_source": {
    "item_type: "11",
    "hair gel": "50",
    "shampoo": "35",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "12",
  "_source": {
    "item_type: "12",
    "tape": "9",
    "10mm screw": "7",
    "blinker fluid": "78",
  }
}

Могу ли я добиться этого в Logstash?

Я новичок в Logstash, но, насколько я понимаю, это можно сделать в filter. Но я не уверен, какой фильтр использовать, или я должен создать собственный фильтр для этого.

Текущий пример конфигурации:

input {
  jdbc {
    jdbc_driver_library => "ojdbc6.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "myjdbc-configs"
    jdbc_user => "dbuser"
    jdbc_password => "dbpasswd"
    schedule => "* * * * *"
    statement => "SELECT * from item_table"
  }
}
filter {
    ## WHAT TO WRITE HERE??
}
output {
    elasticsearch {
        hosts => [ "http://myeshost/" ]
        index => "myindex"
    }
}

Предлагаем, Спасибо.

1 Ответ

2 голосов
/ 02 октября 2019

Этого можно добиться, используя плагин совокупного фильтра . Я не проверял ниже, но должен дать вам представление.

 filter {     
      aggregate {
        task_id => "%{item_type}" #
        code => "
          map['Item_type'] = event.get('Item_type')
          map[event.get('Item_Name')] = map[event.get('Item_Value')]
        "
        push_previous_map_as_event => true
        timeout => 3600
        timeout_tags => ['_aggregatetimeout']
      }
      if "aggregated" not in [tags] {
        drop {}
      }
    }

Важные предостережения для использования совокупного фильтра :

  • SQL-запрос ДОЛЖЕН заказатьрезультаты по Item_Type, поэтому события не в порядке.
  • Имена столбцов в запросе sql должны совпадать с именами столбцов в фильтре map[]
  • Для агрегатов следует использовать ТОЛЬКО ОДИН рабочий потокв противном случае события могут обрабатываться не по порядку, и могут возникнуть непредвиденные результаты.
...