Дано:
- Несколько миллионов записей в коллекции монго.
- Каждая запись имеет 10 полей, из которых 4 составляют составной неуникальный индекс, давайте назовем их КЛЮЧОМ.
- Каждая запись имеет временную метку.
- Некоторые записи имеют одинаковое значение KEY.
- Вполне возможно, что один и тот же КЛЮЧ найден в тысячах записей.
Я хотел бы создать другую коллекцию, содержащую подмножество исходной коллекции, где я хочу ограничить количество дубликатов на каждый KEY не более чем какой-либо константой, например 1000, где должны быть только самые последние дубликаты. включен.
Итак, если есть 10000 записей с одним и тем же KEY, то в новой коллекции будет только 1000 самых последних.
Ниже приведен мой код для создания агрегированной коллекции, содержащей все исходные записи, сгруппированные по KEY. Итак, я упустил часть сохранения только самой последней 1000, но мой код уже крайне неэффективен, поэтому я решил, что уже делаю что-то не так:
from pymongo import Connection
def main():
with Connection() as connection:
mydb = connection.mydb
try:
mydb.aggregated.drop()
mydb.static.map_reduce("""
// map
function() {
emit({
indexed_field1: this.indexed_field1,
indexed_field2: this.indexed_field2,
indexed_field3: this.indexed_field3
}, {
id: this._id,
ts: this.ts,
// other fields
});
}
""", """
// reduce - group the records with the same KEY
// return the given values array wrapped in an object
function(key, values) {
for (var i = 0; i < values.length; ++i) {
if (values[i].items) {
values[i] = values[i].items;
}
}
return {items: values};
}
""", 'aggregated', finalize="""
// finalize by flattening the value, which is likely to be an array of nested arrays
function(key, value) {
function flatten(value, collector) {
var items = value;
if (!(value instanceof Array)) {
if (!value.items) {
collector.push(value);
return;
}
items = value.items;
}
for (var i = 0; i < items.length; ++i) {
flatten(items[i], collector);
}
}
var collector = [];
flatten(value, collector);
return collector;
}
""")
except Exception as exc:
print exc
if __name__ == "__main__":
main()
Другая проблема заключается в том, что я нарушаю принципал, который должен возвращать тот же тип, что и map, но я думаю, что в моем случае все в порядке, потому что мои операции сокращения и завершения работают с ним.
Такое ощущение, что я на неправильном пути. Советы?
EDIT
Данные выглядят так:
{_id: , key1: , key2: , key3: , ts: , bla-bla-bla}
Например:
- 20000 записей с
(key1,key2,key3) == ('yaba', 'daba', 'doo')
- 15 000 записей с
(key1,key2,key3) == ('yogi', 'bear', '')
- 700 записей с
(key1,key2,key3) == ('yo', 'ho', 'ho')
- и т. Д.
В конце процесса мне нужно оставить:
- 1000 самых последних записей Яба-Даба-Ду
- 1000 самых последних записей о йоге-медведе
- все записи йо-хо-хо (потому что их меньше 1000)
- и т. Д.