Фильтр агрегатов jstbc logstash не создает массив объектов в документе - PullRequest
0 голосов
/ 16 октября 2018

Я создал индекс вasticsearch с простым вложенным объектом. Используя фильтр агрегации JDBC logstash, я пытаюсь загрузить документы и вложенный объект.в основном я создал индекс "tickets" и имею вложенный объект "invoices".мой вариант использования привел бы к соотношению 1: M между билетами и счетами.используя запрос sql я получаю билеты и счета.Я предоставил свой файл конфигурации logstash.когда я запускаю первый раз, он правильно создает вложенный объект заявок и накладных.я удалил индекс и попробовал то же самое снова загрузить билеты, на этот раз он не будет создавать счета в билетах.это перечисление только одного счета.когда я снова удаляю индекс и запускаю его, некоторое время он показывает правильно счета, но не всегда.Я не изменяю конфигурационный файл logstash, просто несколько раз запускаю результаты по-разному.Я не уверен, почему он работает один и другой раз, если я запускаю его, а не создает список счетов, а просто показывает первый счет, связанный с заявкой.ценим вашу помощь в этом.спасибо.

index:
------
{
    "mappings": {
      "tickets": {
        "properties": {
          "custid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "custname": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "custpo": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "plantid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "plantname": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "ticketid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "tktdate": {
            "type": "date"
          },
          "htname": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "htaddr1": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "htaddr2": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "truckid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "prodid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "proddesc": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "jobid": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "invprds": {
            "type": "nested",
            "properties": {
                "invnum": {
                    "type": "integer"
                },
                "invdate": {
                    "type": "date"
                }
            }
          }
        }
      }
    }
}

logstash conf file:
--------------------
input {
    jdbc {
        jdbc_driver_library => "C:/DRIVERS/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "sample connection string"
        jdbc_user => "user"
        jdbc_password => "password"
        jdbc_validate_connection => "true"
        statement => "select DISTINCT TRES.ticketid, TRES.tktdate, TRES.plantid, TRES.plantname, TRES.custid, TRES.custname, TRES.custpo, TRES.htname, TRES.htaddr1, TRES.htaddr2, TRES.truckid, TRES.prodid, TRES.proddesc,TRES.jobid, IPRD.invnum, IPRD.invdate from erocks.TICKETFIELDS_RES TRES left outer join erocks.invoice_prod IPRD on TRES.ticketid = IPRD.TICKETNUMBER and TRES.tktdate = IPRD.SHIPDATE and TRES.PLANTID =IPRD.BUNUMBER left outer join erocks.INVOICE_HEADER IHDR on IPRD.INVNUM = IHDR.INVNUM where TRES.ticketid = \'101012\' order by 1,2,3"
    }
}

filter {
     aggregate {
       task_id => "%{ticketid}%{tktdate}%{plantid}"
       code => "
         map['ticketid'] = event.get('ticketid')
         map['tktdate'] = event.get('tktdate')
         map['custid'] = event.get('custid')
         map['plantid'] = event.get('plantid')
         map['custname'] = event.get('custname')
         map['custpo'] = event.get('custpo')
         map['plantname'] = event.get('plantname')
         map['htname'] = event.get('htname')
         map['htaddr1'] = event.get('htaddr1')
         map['htaddr2'] = event.get('htaddr2')
         map['truckid'] = event.get('truckid')
         map['prodid'] = event.get('prodid')
         map['proddesc'] = event.get('proddesc')
         map['jobid'] = event.get('jobid')
         map['invprds'] ||= []
         map['invprds'] << {
            'invnum' => event.get('invnum'), 
            'invdate' => event.get('invdate')
        }
         event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 3
       #timeout_tags => ['aggregated']
    }
}

output {
    elasticsearch {
        action => "index"
        hosts => "http://localhost:9200"
        index => "poctickets"
        document_type => "tickets"
        document_id => "%{ticketid}%{tktdate}%{plantid}"
        workers => 1
    }
    stdout{
        codec => rubydebug
    }
}
...