Как суммировать данные в Google AppEngine - PullRequest
2 голосов
/ 27 марта 2011

Я пытаюсь реализовать сводное представление для большого (ish) набора данных с помощью AppEngine.

Моя модель выглядит примерно так:

def TxRecord(db.Model):
    expense_type = db.StringProperty()
    amount = db.IntegerProperty()

def ExpenseType(db.Model):
    name = db.StringProperty()
    total = db.IntegerProperty()

В моем хранилище данных содержится 100 тыс. Экземпляров.из TxRecord и я хотел бы суммировать их по expense_type.

В sql это было бы что-то вроде:

select expense_type as name, sum(amount) as total 
    from TxRecord
    group by expense_type

То, что я сейчас делаю, использует Python MapReduce framework для итерации по всем TxRecords с использованием следующего преобразователя:

def generate_expense_type(rec):
    expense_type = type.get_or_insert(name, name = rec.expense_type)
    expense_type.total += rec.amount

    yield op.db.Put(expense_type)

Кажется, это работает, но я чувствую, что должен запустить его, используя shard_count1, чтобы гарантировать, что общее количество не будет перезаписано одновременными записями.

Есть ли стратегия, которую я могу использовать, чтобы преодолеть эту проблему с помощью AppEngine или это так?

Ответы [ 3 ]

3 голосов
/ 27 марта 2011

MapReduce отлично подходит для автономной обработки данных, и мне нравится решение Дэвида для обработки счетчиков (+1 upvote).

Я просто хотел упомянуть еще один вариант: обрабатывать данные по мере их поступления. Ознакомьтесь с высокопроизводительными конвейерами данных Бретта Слаткина в App Engine доклад IO 2010.

Я реализовал эту технику в простом фреймворке ( slagg ), вы можете найти мой пример группировки с использованием свертывания даты .

3 голосов
/ 28 марта 2011

Использование mapreduce - правильный подход. Счетчики, как предполагает Дэвид, являются одним из вариантов, но они ненадежны (они используют memcache) и не предназначены для одновременного хранения большого количества счетчиков.

У вашего текущего mapreduce есть несколько проблем: во-первых, get_or_insert выполняет транзакцию хранилища данных каждый раз, когда она вызывается. Затем вы обновляете сумму за пределами транзакции и асинхронно сохраняете ее во второй раз, создавая проблему параллелизма, которая вас беспокоит.

По крайней мере, пока не будет полностью поддержан Reduce, ваш лучший вариант - сделать полное обновление в маппере в транзакции, например так:

def generate_expense_type(rec):
    def _tx():
      expense_type = type.get(name)
      if not expense_type:
        expense_type = type(key_name=name)
      expense_type.total += rec.amount
      expense_type.put()
    db.run_in_transaction(expense_type)
3 голосов
/ 27 марта 2011

Хорошей идеей является использование инфраструктуры MapReduce. Вы можете использовать более одного шарда, если используете счетчики, предоставляемые платформой MapReduce. Поэтому вместо того, чтобы каждый раз изменять хранилище данных, вы можете сделать что-то вроде этого:

yield op.counters.Increment("total_<expense_type_name>", rec.amount)

После того, как MapReduce завершит работу (надеюсь, гораздо быстрее, чем когда вы использовали только один осколок), вы можете скопировать окончательно созданные счетчики в вашу сущность хранилища данных.

...