Logstash 2.3.4 Как загрузить вложенный документ вasticsearch с помощью плагина logstash-jdbc - PullRequest
0 голосов
/ 14 мая 2019

В настоящее время я использую эластичный поиск 2.3.4 и logstash 2.3.4 для загрузки реляционных данных из базы данных Oracle в индекс эластичного поиска с использованием плагина logstash-jdbc. Как предлагается в различных сообщениях, я использую агрегатный фильтр для этого. Тем не менее, я не могу загрузить внутренний вложенный объект в документ. Значения не отображаются в поля и отображаются как NULL.

У меня есть два связанных объекта со следующими данными:

    CREATE TABLE DEPARTMENT (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL
    )

    CREATE TABLE EMPLOYEE (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL,
        departmentid NUMBER,
        CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
    ) 


    insert into DEPARTMENT values (1, 'dept1');
    insert into DEPARTMENT values (2, 'dept2');
    insert into DEPARTMENT values (3, 'dept3');
    insert into DEPARTMENT values (4, 'dept4');

    insert into EMPLOYEE values (1, 'emp1', 1);
    insert into EMPLOYEE values (2, 'emp2', 1);
    insert into EMPLOYEE values (3, 'emp3', 1);
    insert into EMPLOYEE values (4, 'emp4', 2);
    insert into EMPLOYEE values (5, 'emp5', 2);
    insert into EMPLOYEE values (6, 'emp6', 3);`

Вот мое отображение:

   {
        "mappings": {
            "departments": {
                "properties": {
                    "id": {
                        "type": "integer"
                    },
                    "deptName": {
                        "type": "string"
                    },          
                    "employee_details": {
                        "type": "nested",
                        "properties": {
                            "empId": {
                                "type": "integer"
                            },
                            "empName": {
                                "type": "string"
                            }
                        }
                    }
                }
            }
        }
    }

И это моя конфигурация logstash:

  input{
        jdbc{
            jdbc_validate_connection => true
            jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
            jdbc_user => "user"
            jdbc_password => "pwd"
            jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
            jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
            statement => "SELECT 
                            department.id AS id,
                            department.name AS deptName,
                            employee.id AS empId,
                            employee.name AS empName
                        FROM  department LEFT JOIN employee  
                        ON department.id = employee.departmentid
                        ORDER BY id"
        }
    }

    filter{
        aggregate {
            task_id => "%{id}"
            code => "
            map['id'] = event['id']
            map['deptName'] = event['deptName'] #solution - deptName should be in smaller case and other fields too.
            map['employee_details'] ||= []
            map['employee_details'] << {'empId' => event['empId], 'empName' => event['empName'] }
            "

            push_previous_map_as_event => true
            timeout => 5
            timeout_tags => ['aggregated']
        }       
    }

    output{
    stdout{ codec => rubydebug }
        elasticsearch{
            action => "index"
            index => "my_index"
            document_type => "departments"
            document_id => "%{id}"
            hosts => "localhost:9200"
        }
    }

Когда я выполняю XGET для всех документов: curl -XGET 'localhost: 9200 / my_index / _search /? pretty = true & q = :

Значения не сопоставляются с полями и отображаются как NULL:

      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
      },
      "hits": {
        "total": 4,
        "max_score": 1,
        "hits": [
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "2",
            "_score": 1,
            "_source": {
              "id": 2,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "4",
            "_score": 1,
            "_source": {
              "id": 4,
              "deptname": "dept4",
              "empid": null,
              "empname": null,
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.367Z",
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                }
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "1",
            "_score": 1,
            "_source": {
              "id": 1,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "3",
            "_score": 1,
            "_source": {
              "id": 3,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.492Z",
              "tags": [
                "aggregated"
              ]
            }
          }
        ]
      }
    }

rubydebug предполагает, что значения установлены в «ноль». Может ли кто-нибудь помочь мне с тем, что я делаю здесь неправильно?

Вот фрагмент из stdout для документа с id = 1:

{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 1.0,
       "empname" => "emp1",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:14.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 2.0,
       "empname" => "emp2",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 3.0,
       "empname" => "emp3",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
                  "id" => 1.0,
            "deptName" => nil,
    "employee_details" => [
        [0] {
              "empId" => nil,
            "empName" => nil
        },
        [1] {
              "empId" => nil,
            "empName" => nil
        },
        [2] {
              "empId" => nil,
            "empName" => nil
        }
    ],
            "@version" => "1",
          "@timestamp" => "2019-05-14T12:32:15.381Z",
                "tags" => [
        [0] "aggregated"
    ]
}
...