Вложенный документ в эластичный поиск с использованием logstash - PullRequest
0 голосов
/ 30 января 2020

Привет. Все, что я пытаюсь проиндексировать документы с сервера MS SQL наasticsearch, используя logsta sh. Я хотел, чтобы мои документы принимались как вложенные документы, но я получаю совокупную ошибку исключения

Здесь я помещаю весь свой код

Create table department(
ID Int identity(1,1) not null,
Name varchar(100)
)

Insert into department(Name)
Select 'IT Application development'

union all

Select 'HR & Marketing'

Create table Employee(
ID Int identity(1,1) not null,
emp_Name varchar(100),
dept_Id int
)

Insert into Employee(emp_Name,dept_Id)
Select 'Mohan',1
union all
Select 'parthi',1
union all
Select 'vignesh',1

Insert into Employee(emp_Name,dept_Id)
Select 'Suresh',2
union all
Select 'Jithesh',2
union all
Select 'Venkat',2

Окончательный оператор выбора

SELECT 
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id

Результат должен быть таким

enter image description here

Моя эластичность c сопоставление поиска

PUT /departments
{
 "mappings": {
   "properties": {
     "id":{
       "type":"integer"
     },
     "deptname":{
       "type":"text"
     },
     "employee_details":{
       "type": "nested",
       "properties": {
         "empid":{
           "type":"integer"
         },
         "empname":{
           "type":"text"
         }
       }
     }
   }
 }
}

Мой logsta sh файл конфигурации

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "SELECT 
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id"
}
}
filter{
        aggregate {
        task_id => "%{id}"
        code => "
        map['id'] = event['id']
        map['deptname'] = event['deptname']
        map['employee_details'] ||= []
        map['employee_details'] << {'empId' => event['empid'], 'empname' => event['empname'] }
        "
        push_previous_map_as_event => true
        timeout => 5
        timeout_tags => ['aggregated']
        } 
    }
    output{
    stdout{ codec => rubydebug }
    elasticsearch{
            hosts => "https://d9bc7cbca5ec49ea96a6ea683f70caca.eastus2.azure.elastic-cloud.com:4567"
            user => "elastic"
            password => "****"
            index => "departments"
            action => "index"
            document_type => "departments"
            document_id => "%{id}"
           }

}

во время работы logsta sh я получаю ниже ошибка

enter image description here

Elasti c поиск снимка экрана для справки

enter image description here

мой вывод эластичного поиска должен выглядеть примерно так

{
  "took" : 398,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "departments",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "deptname" : "IT Application development"
          "employee_details" : [
          {
          "empid" : 1,
          "empname" : "Mohan"
           },
          {
          "empid" : 2,
          "empname" : "Parthi"
          },
          {
          "empid" : 3,
          "empname" : "Vignesh"
           }
          ]
         }
      }
    ]
  }
}

Может ли кто-нибудь помочь мне решить эту проблему? Я хочу, чтобы empname и empid всех сотрудников вставлялись как вложенный документ для соответствующего отдела. Заранее спасибо

1 Ответ

0 голосов
/ 04 февраля 2020

Вместо совокупного фильтра, который я использовал JDBC_STREAMING, он работает нормально, может быть полезным для тех, кто просматривает этот пост.

input {
jdbc {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxx;"
jdbc_user => "xxx"
jdbc_password => "xxxx"
statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
}
}
filter{
jdbc_streaming {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
parameters => {"policynumber" => "policynumber"}
target => "claim_details"
}
}
output {
elasticsearch {
hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:9243"
user => "elastic"
password => "xxxx"
index => "xxxx"
action => "index"
document_type => "_doc"
document_id => "%{policynumber}"

}
stdout { codec => rubydebug }
}
...