Я хотел бы объединить информацию из двух разных таблиц в разных базах данных.В таблице t_a из базы данных db_a у меня есть информация о пользователях, такая как имя пользователя, имя, номер телефона и т. Д. В таблице t_b из базы данных db_b у меня есть специфическая для системы информация о пользователях.Например, для этой системы пользователь X имеет тип Y. Таблица t_b имеет поле, указывающее идентификатор пользователя в таблице t_a, и именно так я могу связать две таблицы.Я хочу сгруппировать пользователей по имени пользователя в Elastic, и внутри каждого документа будет массив пользователей, которые имеют конкретную информацию о пользователях для каждого существующего пользователя с таким именем пользователя.Например,
{
"_index" : "users",
...
"source" : {
"user_id" : "sysadmin",
"username" : "sysadmin"
"users": {
{
"name" : null, \\ From t_a
"email" : "fabio.lp@abcd.com", \\ From t_a
"tenantid" : "e118af1f-6674-4348-bb12-xxxxxxxx27a6", \\ From t_a
"user_type" : 1050 \\ From t_b
},
{
"name" : "Administrator", \\ From t_a
"email" : "joao.lapa@abcd.com", \\ From t_a
"tenantid" : "e118af1f-6674-4348-bb12-9032bd2xxxxx", \\ From t_a
"user_type" : 1050 \\ From t_b
}
}
}
}
Вот что я сейчас пытаюсь сделать.К сожалению, не работает:
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://db_a;databaseName=t_a;user=xx;password=xxx;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "xx"
jdbc_password => "xxx"
statement => "SELECT * FROM Users order by username"
}
}
filter {
jdbc_streaming {
jdbc_connection_string => "jdbc:sqlserver://db_b;databaseName=t_b;user=bbb;password=aaa;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "aaa"
jdbc_password => "bbb"
statement => "select ExternalID, UserTypeID from [User] where ExternalID = :userid"
parameters => { "userid" => "id" }
target => "extra"
add_field => {
"userType" => "%{[extra][0][UserTypeID]}"
}
remove_field => ["extra"]
}
aggregate {
task_id => "%{username}"
code => "
map['username'] = event.get('username')
map['users'] ||= [ ]
map['users'] << {'name' => event.get('name'),
'email' => event.get('email'),
'tenantid' => event.get('tenantid')}
event.cancel()
"
push_previous_map_as_event => true
timeout_task_id_field => "user_id"
timeout => 3600
inactivity_timeout => 300
timeout_tags => ['_aggregatetimeout']
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "users"
}
}
Я пытался объяснить лучшее, что мог = / Я хочу знать, возможно ли это, и если да, то как я могу этого достичь.