У меня есть конвейер logsta sh, который
- извлекает данные из mysql, используя входной соединитель jdb c
- , собирает данные для пользователей на основе идентификатора пользователя
- отправляет агрегированные данные в кластерasticsearch
Извлекает большой объем данных (например, 2 миллиона строк) с сервера mysql и использует выборку курсора с "jdbc_fetch_size" 100000, чтобы он не загружать все строки одновременно (не размер страницы, лимит + смещение, а fetch_size), чтобы избежать исключения из памяти.
Ниже моя конфигурация:
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://host/dbname?useCursorFetch=true"
jdbc_fetch_size=>"100000"
jdbc_user => ""
jdbc_password => ""
statement => "Select *
from users_list
left join user_posts on users_list.id = user_posts.user_id
left join user_friends on users_list.id = user_friends.user_id
order by users_list.id;"
}
}
filter {
aggregate {
task_id => "%{id}" #id of user
code => "
map['user_id'] ||= event.get('id')
map['name'] ||= event.get('name')
map['email'] ||= event.get('email')
if (event.get('post_id') != nil)
map['posts'] << {
'id' => event.get('post_id'),
'title' => event.get('post_title'),
'description' => event.get('post_description')}
end
if (event.get('friend_id') != nil)
map['friends'] << {
'id' => event.get('friend_id'),
'name' => event.get('friend_name')}
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
elasticsearch {
document_id => "%{id}"
index => ""
hosts => [""]
user => ""
password => ""
}
}
Ниже моя эластичность c сопоставление поискового индекса:
{
"mappings": {
"properties": {
"user_id": {
"type": "long"
},
"posts": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text"
},
"description": {
"type": "text"
}
}
},
"friends": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text"
}
}
},
"email": {
"type": "text"
},
"name": {
"type": "text"
}
}
}
}
Кажется, все хорошо, но у меня есть проблема с агрегацией, мой вопрос:
- Как logsta sh обрабатывает разбиение на страницы с агрегацией, я не уверен над случаем, когда logsta sh выбирает первую партию из 100000 строк из mysql, существует вероятность того, что данные (строки) этого пользователя могут быть каким-то образом разделены при получении первых пакетов первая половина (связанные строки для этого пользователя) и следующая выборка получают вторую половину (для этого пользователя) позже, во втором круговом цикле, не существует ли вероятности того, что во время второго кругового цикла агрегированная карта (до этого момента) для этого пользователя очищается по таймауту, а ранее агрегированные данные сбрасываются logsta sh. Я знаю, что все должно быть хорошо, если мы не будем использовать пагинацию bcs, тогда не будет никакого разделения данных по горизонтали, но для этого объема данных мы должны использовать нумерацию страниц
- , если это так, как мы можем убедиться, что никакие данные не будут потеряны / перезаписаны второй партией, пока мы выполняем агрегацию с использованием разбиения на страницы?
- другой вопрос: logsta sh очищает первую партию событий до того, как она отправится на вторая поездка туда и обратно для следующей партии или она тоже сохраняет их в памяти?