Объединить информацию из отдельных баз данных в один документ в Logstash - PullRequest
0 голосов
/ 25 сентября 2019

Я хотел бы объединить информацию из двух разных таблиц в разных базах данных.В таблице t_a из базы данных db_a у меня есть информация о пользователях, такая как имя пользователя, имя, номер телефона и т. Д. В таблице t_b из базы данных db_b у меня есть специфическая для системы информация о пользователях.Например, для этой системы пользователь X имеет тип Y. Таблица t_b имеет поле, указывающее идентификатор пользователя в таблице t_a, и именно так я могу связать две таблицы.Я хочу сгруппировать пользователей по имени пользователя в Elastic, и внутри каждого документа будет массив пользователей, которые имеют конкретную информацию о пользователях для каждого существующего пользователя с таким именем пользователя.Например,

{
  "_index" : "users",
  ...
 "source" : {
     "user_id" : "sysadmin",
     "username" : "sysadmin"
     "users": {
               {
                        "name" : null, \\ From t_a
                        "email" : "fabio.lp@abcd.com", \\ From t_a
                        "tenantid" : "e118af1f-6674-4348-bb12-xxxxxxxx27a6", \\ From t_a
                        "user_type" : 1050 \\ From t_b
               },
               {
                        "name" : "Administrator", \\ From t_a
                        "email" : "joao.lapa@abcd.com", \\ From t_a
                        "tenantid" : "e118af1f-6674-4348-bb12-9032bd2xxxxx", \\ From t_a
                        "user_type" : 1050 \\ From t_b
               }
      }
  }
}

Вот что я сейчас пытаюсь сделать.К сожалению, не работает:

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://db_a;databaseName=t_a;user=xx;password=xxx;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "xx"
    jdbc_password => "xxx"

    statement => "SELECT * FROM Users order by username"
  }
}

filter {
  jdbc_streaming {
        jdbc_connection_string => "jdbc:sqlserver://db_b;databaseName=t_b;user=bbb;password=aaa;"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_user => "aaa"
        jdbc_password => "bbb"
        statement => "select ExternalID, UserTypeID from [User] where ExternalID = :userid"
        parameters => { "userid" => "id" }
        target => "extra"
        add_field => {
            "userType" => "%{[extra][0][UserTypeID]}"
        }
        remove_field => ["extra"]
    }

  aggregate {
    task_id => "%{username}"
    code => "
         map['username'] = event.get('username')
         map['users'] ||= [ ]
         map['users'] << {'name' => event.get('name'), 
                        'email' => event.get('email'),
                        'tenantid' => event.get('tenantid')}
         event.cancel()
       "
    push_previous_map_as_event => true
    timeout_task_id_field => "user_id"
    timeout => 3600
    inactivity_timeout => 300
    timeout_tags => ['_aggregatetimeout']
    }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "users"
    }
}

Я пытался объяснить лучшее, что мог = / Я хочу знать, возможно ли это, и если да, то как я могу этого достичь.

...