Как объединить несколько входов JDBC в конфигурации logstash на основе условия одного столбца? - PullRequest
0 голосов
/ 19 декабря 2018

У меня есть две таблицы на сервере sql, т.е. AppDetails и AppBranchDetails.Я хочу прочитать все строки этих двух таблиц и объединить их в зависимости от условия.

Ниже приведены два запроса, которые я хочу выполнить:

  1. выберите id в качестве colg_id, name,sirname из порядка AppDetails по идентификатору
  2. выберите идентификатор в качестве идентификатора филиала, идентификатора ветви, ветви_адд из идентификатора AppBranchDetails по идентификатору

Из приведенных выше двух запросов "id" является первичным ключом, который является тем жедля обеих таблиц.

Вывод будет выглядеть ниже для id == 1 :

{
  "name": "ram",
  "sirname": "patil",
  "id": 1,
  "BRANCH": [
    {
      "id": 1,
      "branch_name": "EE",
      "branch_add": "IND"
    },
    {
      "id": 1,
      "branch_name": "ME",
      "branch_add": "IND"
    }
  ]
}

Вывод будет выглядеть ниже для id == 2 :

{
  "name": "sham",
  "sirname": "bhosle",
  "id": 2,
  "BRANCH": [
    {
      "id": 2,
      "branch_name": "SE",
      "branch_add": "US"
    },
    {
      "id": 2,
      "branch_name": "FE",
      "branch_add": "US"
    }
  ]
}

Я пытаюсь использовать следующую конфигурацию (app.conf):

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "sa"
    jdbc_password => "sa@111"
    statement => "select id as colg_id, name, sirname from AppDetails order by id"
    tracking_column => "colg_id"
    use_column_value => true
    type => "college"
  }
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "sa"
    jdbc_password => "sa@111"
    statement => "select id as branch_id, branch_name, branch_add from AppBranchDetails order by id"
    tracking_column => "branch_id"
    use_column_value => true
    type => "branch"
  }
}

filter {
    if [type] == "branch" {
        aggregate {
        task_id => "%{branch_id}"
        code => "
            map['BRANCH'] ||= []
            map['BRANCH'] << event.get('branch_id')
            map['BRANCH'] << event.get('branch_name')
            map['BRANCH'] << event.get('branch_add')
            event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 5
        }
        mutate {
            remove_field => [ "@version" , "@timestamp" ]
        }
    }
}

output {
  stdout { codec => json_lines }
}

Может кто-нибудь подсказать, как я могу добиться результата, который я упомянулвыше.

...