Я создал индекс вasticsearch с простым вложенным объектом. Используя фильтр агрегации JDBC logstash, я пытаюсь загрузить документы и вложенный объект.в основном я создал индекс "tickets" и имею вложенный объект "invoices".мой вариант использования привел бы к соотношению 1: M между билетами и счетами.используя запрос sql я получаю билеты и счета.Я предоставил свой файл конфигурации logstash.когда я запускаю первый раз, он правильно создает вложенный объект заявок и накладных.я удалил индекс и попробовал то же самое снова загрузить билеты, на этот раз он не будет создавать счета в билетах.это перечисление только одного счета.когда я снова удаляю индекс и запускаю его, некоторое время он показывает правильно счета, но не всегда.Я не изменяю конфигурационный файл logstash, просто несколько раз запускаю результаты по-разному.Я не уверен, почему он работает один и другой раз, если я запускаю его, а не создает список счетов, а просто показывает первый счет, связанный с заявкой.ценим вашу помощь в этом.спасибо.
index:
------
{
"mappings": {
"tickets": {
"properties": {
"custid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"custname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"custpo": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"plantid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"plantname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"ticketid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"tktdate": {
"type": "date"
},
"htname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"htaddr1": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"htaddr2": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"truckid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"prodid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"proddesc": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"jobid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"invprds": {
"type": "nested",
"properties": {
"invnum": {
"type": "integer"
},
"invdate": {
"type": "date"
}
}
}
}
}
}
}
logstash conf file:
--------------------
input {
jdbc {
jdbc_driver_library => "C:/DRIVERS/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "sample connection string"
jdbc_user => "user"
jdbc_password => "password"
jdbc_validate_connection => "true"
statement => "select DISTINCT TRES.ticketid, TRES.tktdate, TRES.plantid, TRES.plantname, TRES.custid, TRES.custname, TRES.custpo, TRES.htname, TRES.htaddr1, TRES.htaddr2, TRES.truckid, TRES.prodid, TRES.proddesc,TRES.jobid, IPRD.invnum, IPRD.invdate from erocks.TICKETFIELDS_RES TRES left outer join erocks.invoice_prod IPRD on TRES.ticketid = IPRD.TICKETNUMBER and TRES.tktdate = IPRD.SHIPDATE and TRES.PLANTID =IPRD.BUNUMBER left outer join erocks.INVOICE_HEADER IHDR on IPRD.INVNUM = IHDR.INVNUM where TRES.ticketid = \'101012\' order by 1,2,3"
}
}
filter {
aggregate {
task_id => "%{ticketid}%{tktdate}%{plantid}"
code => "
map['ticketid'] = event.get('ticketid')
map['tktdate'] = event.get('tktdate')
map['custid'] = event.get('custid')
map['plantid'] = event.get('plantid')
map['custname'] = event.get('custname')
map['custpo'] = event.get('custpo')
map['plantname'] = event.get('plantname')
map['htname'] = event.get('htname')
map['htaddr1'] = event.get('htaddr1')
map['htaddr2'] = event.get('htaddr2')
map['truckid'] = event.get('truckid')
map['prodid'] = event.get('prodid')
map['proddesc'] = event.get('proddesc')
map['jobid'] = event.get('jobid')
map['invprds'] ||= []
map['invprds'] << {
'invnum' => event.get('invnum'),
'invdate' => event.get('invdate')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
#timeout_tags => ['aggregated']
}
}
output {
elasticsearch {
action => "index"
hosts => "http://localhost:9200"
index => "poctickets"
document_type => "tickets"
document_id => "%{ticketid}%{tktdate}%{plantid}"
workers => 1
}
stdout{
codec => rubydebug
}
}