MongoDB incremental mapReduce, выбрать только новые документы, добавленные после последнего mapReduce - PullRequest
8 голосов
/ 17 октября 2011

Допустим, у меня есть коллекция с документами, которая выглядит следующим образом (просто упрощенный пример, но должна показать схему):

> db.data.find()
{ "_id" : ObjectId("4e9c1f27aa3dd60ee98282cf"), "type" : "A", "value" : 11 }
{ "_id" : ObjectId("4e9c1f33aa3dd60ee98282d0"), "type" : "A", "value" : 58 }
{ "_id" : ObjectId("4e9c1f40aa3dd60ee98282d1"), "type" : "B", "value" : 37 }
{ "_id" : ObjectId("4e9c1f50aa3dd60ee98282d2"), "type" : "B", "value" : 1 }
{ "_id" : ObjectId("4e9c1f56aa3dd60ee98282d3"), "type" : "A", "value" : 85 }
{ "_id" : ObjectId("4e9c1f5daa3dd60ee98282d4"), "type" : "B", "value" : 12 }

Теперь мне нужно собрать статистику по этой коллекции. Например:

db.data.mapReduce(function(){
        emit(this.type,this.value);
     },function(key,values){
        var total = 0;
        for(i in values) {total+=values[i]};
        return total;
     },
{out:'stat'})

будет собирать итоги в коллекции 'stat'.

> db.stat.find()
{ "_id" : "A", "value" : 154 }
{ "_id" : "B", "value" : 50 }

На данный момент все идеально, но я застрял на следующем ходу:

  1. Коллекция 'data' постоянно обновляется новыми данными (старые документы остаются без изменений, только вставки, без обновлений)
  2. Я хотел бы периодически обновлять коллекцию 'stat', но не хочу каждый раз запрашивать всю коллекцию 'data', поэтому я решил запустить инкрементную mapReduce
  3. Может показаться целесообразным просто обновлять коллекцию stat для каждой вставки в коллекции данных и не использовать mapReduce, но реальный случай более сложный, чем в этом примере, и я хотел бы получать статистику только по запросу.
  4. Для этого я должен иметь возможность запрашивать только те документы, которые были добавлены после моего последнего mapReduce
  5. Насколько я понимаю, я не могу полагаться на свойство ObjectId, просто сохраните последний, а затем выберите каждый документ с сохраненным ObjectId>, поскольку ObjectId не совпадает с идентификаторами автоинкремента в базах данных SQL (например, разные шарды будут создавать разные ObjectId).
  6. Я могу изменить генератор ObjectId, но не уверен, как это сделать лучше в защищенной среде

Так что вопрос:

Можно ли выбрать только документы, добавленные после последнего mapReduce для запуска пошагового mapReduce, или, может быть, существует другая стратегия обновления статистических данных для постоянно растущей коллекции?

Ответы [ 4 ]

4 голосов
/ 19 октября 2011

Вы можете кэшировать время и использовать его в качестве барьера для вашего следующего приращения карты.

Мы тестируем это на работе, и, похоже, оно работает. Поправьте меня, если я ошибаюсь, но вы не можете безопасно уменьшить карту, пока вставка происходит через осколки. Версии становятся несовместимыми, и ваша операция сокращения карты завершится неудачно. (Если вы найдете решение для этого, пожалуйста, дайте мне знать!:)

Вместо этого мы используем пакетные вставки, один раз каждые 5 минут. После того, как все массовые вставки выполнены, мы запускаем map-Reduce так (в Python):

m = Code(<map function>)
r = Code(<reduce function>)

# pseudo code
end = last_time + 5 minutes

# Use time and optionally any other keys you need here
q = bson.SON([("date" : {"$gte" : last_time, "$lt" : end})])

collection.map_reduce(m, r, out=out={"reduce": <output_collection>}, query=q)

Обратите внимание, что мы использовали reduce, а не merge, потому что мы не хотим переопределять то, что у нас было раньше; мы хотим объединить старые результаты и новый результат с помощью одной и той же функции сокращения.

4 голосов
/ 17 октября 2011

Вы можете получить только временную часть идентификатора, используя _id.getTime() (из: http://api.mongodb.org/java/2.6/org/bson/types/ObjectId.html).. Это должно быть отсортировано по всем осколкам.

РЕДАКТИРОВАТЬ: Извините, это были документы Java ... Версия JS выглядит как _id.generation_time.in_time_zone (Time.zone), с http://mongotips.com/b/a-few-objectid-tricks/

2 голосов
/ 30 июня 2012

Я написал полное решение на основе pymongo, которое использует пошаговое уменьшение карт, кэширует время и ожидает выполнения в задании cron. Он блокируется, поэтому два не могут работать одновременно:

https://gist.github.com/2233072

""" This method performs an incremental map-reduce on any new data in 'source_table_name' 
into 'target_table_name'.  It can be run in a cron job, for instance, and on each execution will
process only the new, unprocessed records.  

The set of data to be processed incrementally is determined non-invasively (meaning the source table is not 
written to) by using the queued_date field 'source_queued_date_field_name'. When a record is ready to be processed, 
simply set its queued_date (which should be indexed for efficiency). When incremental_map_reduce() is run, any documents 
with queued_dates between the counter in 'counter_key' and 'max_datetime' will be map/reduced.

If reset is True, it will drop 'target_table_name' before starting.

If max_datetime is given, it will only process records up to that date.

If limit_items is given, it will only process (roughly) that many items. If multiple
items share the same date stamp (as specified in 'source_queued_date_field_name') then
it has to fetch all of those or it'll lose track, so it includes them all. 

If unspecified/None, counter_key defaults to counter_table_name:LastMaxDatetime.
"""
0 голосов
/ 19 августа 2014

Мы решаем эту проблему, используя «нормализованные» ObjectIds. Шаги, которые мы делаем:

  1. normalize id - взять timestap из текущего / сохраненного / последнего обработанного id и установить другое части id до его минимальных значений. Код C #: new ObjectId(objectId.Timestamp, 0, short.MinValue, 0)
  2. запустить карту-уменьшить со всеми элементами, которые имеют идентификатор больше нашего нормализованного идентификатора, пропустить уже обработанные элементы.
  3. сохранить последний обработанный идентификатор и отметить все обработанные элементы.

Примечание: некоторые граничные элементы будут обработаны несколько раз. Чтобы исправить это, мы устанавливаем какой-то флаг в обработанных элементах.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...