Сделать GROUP BY с MapReduce в App Engine - PullRequest
1 голос
/ 12 февраля 2012

Я ищу способ выполнения операции GROUP BY в запросе в хранилище данных с использованием MapReduce.AFAIK App Engine не поддерживает саму GROUP BY в GQL, и другие разработчики предлагают хороший подход: MapReduce .

Я скачал исходный код , и я 'Я изучаю демонстрационный код , и я попытался реализовать его в моем случае.Но у меня не было успеха.Вот как я пытался это сделать.Может быть, все, что я сделал, неправильно.Поэтому, если бы кто-нибудь мог помочь мне сделать это, я бы поблагодарил.


Что я хочу сделать, так это: у меня есть группа контактов в хранилище данных, и у каждого контакта есть дата.Есть куча повторных контактов с одной и той же датой.Что я хочу сделать, так это просто собрать группу, собрать одинаковые контакты с одной и той же датой.

Например:

Допустим, у меня есть эти контакты:

  1. CONTACT_NAME: Foo1 |ДАТА: 01-10-2012
  2. CONTACT_NAME: Foo2 |ДАТА: 02-05-2012
  3. CONTACT_NAME: Foo1 |ДАТА: 01-10-2012

Поэтому после операции MapReduce это будет примерно так:

  1. CONTACT_NAME: Foo1 |ДАТА: 01-10-2012
  2. CONTACT_NAME: Foo2 |ДАТА: 02-05-2012

Для функции GROUP BY я думаю, что подсчет слов работает.


РЕДАКТИРОВАТЬ

Единственное, что отображается в журнале:

/ mapreduce / pipe / run 200

Запуск GetContactData.WordCountPipeline ( (u'2012-02-02)',), * {}) # da26a9b555e311e19b1e6d324d450c1a

END EDIT

Если я что-то не так делаю, и если я используюнеправильный подход для выполнения GROUP BY с MapReduce, помогите мне, как сделать это с MapReduce.


Вот мой код:

from Contacts import Contacts
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.api import mail
from google.appengine.ext.db import GqlQuery
from google.appengine.ext import db


from google.appengine.api import taskqueue
from google.appengine.api import users

from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler

import simplejson, logging, re


class GetContactData(webapp.RequestHandler):

    # Get the calls based on the user id
    def get(self):
        contactId = self.request.get('contactId')
        query_contacts = Contact.all()
        query_contacts.filter('contact_id =', int(contactId))
        query_contacts.order('-timestamp_')
        contact_data = []
        if query_contacts != None:
            for contact in query_contacts:
                    pipeline = WordCountPipeline(contact.date)
                    pipeline.start()
                    record = { "contact_id":contact.contact_id,
                               "contact_name":contact.contact_name,
                               "contact_number":contact.contact_number,
                               "timestamp":contact.timestamp_,
                               "current_time":contact.current_time_,
                               "type":contact.type_,
                               "current_date":contact.date }
                    contact_data.append(record)

        self.response.headers['Content-Type'] = 'application/json'
        self.response.out.write(simplejson.dumps(contact_data)) 

class WordCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Word count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, date):
    output = yield mapreduce_pipeline.MapreducePipeline(
        "word_count",
        "main.word_count_map",
        "main.word_count_reduce",
        "mapreduce.input_readers.DatastoreInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "date": date,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=16)
    yield StoreOutput("WordCount", output)

class StoreOutput(base_handler.PipelineBase):
  """A pipeline to store the result of the MapReduce job in the database.

  Args:
    mr_type: the type of mapreduce job run (e.g., WordCount, Index)
    encoded_key: the DB key corresponding to the metadata of this job
    output: the blobstore location where the output of the job is stored
  """

  def run(self, mr_type, output):
      logging.info(output) # here I should append the grouped duration in JSON

1 Ответ

0 голосов
/ 15 февраля 2012

Я основывался на коде @autumngard, представленном в этом вопросе , и был изменен в соответствии с моими целями, и это сработало.

...