Я перемещаю данные из mysql в Elasti c Поиск с использованием logsta sh. Все работает нормально, но у меня есть проблемы с внутренними вложенными объектами. У меня есть пользователи, posts, posts_media и таблица комментариев. Я хочу сохранить единственный объект вasticsearch следующим образом.
{
"user": [{
"posts": [{
"post_media": [{
"bucket": "fayvo-sample",
"bg_color": "#C19081",
"home_file_height": 860,
},
{
"bucket": "fayvo-sample",
"bg_color": "#C19081",
"home_file_height": 860,
}
],
"comments": [{
"id": 3,
"title": "i like this ",
},
{
"id": 4,
"title": "nice ",
}
],
"id": 1,
"title": "Wearing my Biostrap",
"created_at": "2017-01-23T04:04:17.000Z",
"@timestamp": "2020-02-05T15:48:08.033Z"
},
{
"post_media": [{
"bucket": "fayvo-sample",
"bg_color": "#C19081",
"home_file_height": 860,
},
{
"bucket": "fayvo-sample",
"bg_color": "#C19081",
"home_file_height": 860,
}
],
"comments": [{
"id": 3,
"title": "i like this ",
},
{
"id": 4,
"title": "nice ",
}
],
"id": 2,
"title": "Wearing my Biostrap",
"created_at": "2017-01-23T04:04:17.000Z",
"user_id": 2222,
"@timestamp": "2020-02-05T15:48:08.033Z"
}
],
}]
}
Проблема в том, что я не могу сохранить вложенные объекты post_media и comments вasticsearch. если у поста три медиа, то вместо одного поста он создает три поста, также он должен обернуть три медиа-объекта в post_media для одного поста. Ниже приведен мой пример кода.
input{
jdbc {
jdbc_connection_string => "your db connection"
jdbc_user => ""
jdbc_password => ""
jdbc_validate_connection => true
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
use_column_value => true
tracking_column => "updated_timestamp"
tracking_column_type => "timestamp"
jdbc_default_timezone => "UTC"
schedule => "* * * * * *"
statement => "your query "
last_run_metadata_path => "/home/ubuntu/logstash_track_date/.logstash_newtest_jdbc_last_run"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['posts'] ||= []
map['posts'] << {
'box_id' => event.get('box_id'),
'score' => event.get('score'),
'post_box_id' => event.get('post_box_id'),
'user_id' => event.get('user_id'),
'text_content' => event.get('text_content'),
'created_at' => event.get('post_created'),
'id' => event.get('post_id'),
'post_media' => [] << {
'user_post_id' => event.get('user_post_id'),
'file_width' => event.get('file_width'),
},
'comments' => [] << {
'id' => event.get('comment_id'),
'title' => event.get('comment_title'),
}
}
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => "posts"
document_type => "_doc"
document_id => "p-%{id}"
hosts => "localhost:9200"
routing => 1
doc_as_upsert => true
}
}