Exemplo de contador simples usando mapreduce no Google App Engine

Estou um pouco confuso com o estado atual do suporte de mapreduce no GAE. De acordo com os documentoshttp: //code.google.com/p/appengine-mapreduce fase de redução ainda não é suportada, mas na descrição da sessão de E / S 2011 http: //www.youtube.com/watch? v = EIxelKcyCC0) está escrito "Agora é possível executar tarefas completas de Redução de mapa no App Engine". Gostaria de saber se posso usar mapreduce nesta tarefa:

O que eu quero fazer

Tenho modelo de carro com a cor do campo:

class Car(db.Model):
    color = db.StringProperty()

Eu quero executar um processo de mapreduce (de tempos em tempos, definido pelo cron) que pode calcular quantos carros existem em cada cor e armazenar esse resultado no armazenamento de dados. Parece um trabalho adequado para mapreduce (mas, se estiver errado, corrija-me), a fase "map" produzirá pares (, 1) para cada entidade Car, e a fase "reduzir" deverá mesclar esses dados por color_name, fornecendo os resultados esperados . O resultado final que desejo obter são entidades com dados computados armazenados no armazenamento de dados, algo como:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Problema Não sei como implementar isso no appengine ... O vídeo mostra exemplos com funções definidas de mapa e redução, mas parecem exemplos muito gerais, não relacionados ao armazenamento de dados. Todos os outros exemplos que encontrei estão usando uma função para processar os dados do DatastoreInputReader, mas parecem ser apenas a fase "map", não há exemplo de como fazer a "redução" (e como armazenar resultados reduzidos no banco de dados)

questionAnswers(2)

Você realmente não precisa de uma fase de redução. Você pode fazer isso com uma cadeia de tarefas linear, mais ou menos da seguinte maneira:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

Isso deve percorrer todos os seus carros, passar um cursor de consulta e um registro em execução para uma série de tarefas ad-hoc e armazenar os totais no fina

Uma fase de redução pode fazer sentido se você tiver que mesclar dados de vários datastores, vários modelos ou vários índices em um único modelo. Como é, acho que não compraria nada para voc

Outra opção: use a fila de tarefas para manter os contadores ativos para cada cor. Ao criar um carro, inicie uma tarefa para aumentar o total dessa cor. Ao atualizar um carro, inicie uma tarefa para diminuir a cor antiga e outra para aumentar a nova cor. Atualize os contadores de maneira transacional para evitar condições de corrid

QuestionSolution

Estou fornecendo aqui a solução que descobri eventualmente usando mapreduce do GAE (sem fase de redução). Se eu tivesse começado do zero, provavelmente usaria a solução fornecida porDrew Sears.

Funciona em python GAE 1.5.0

Em app.yaml Adicionei o manipulador para mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

e o manipulador do meu código para mapreduce (estou usando url / mapred_update para reunir os resultados produzidos por mapreduce):

- url: /mapred_.*
  script: mapred.py

Criada mapreduce.yaml para processar entidades Automóvel:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

Explanation: done_callback é um URL que é chamado depois que o mapreduce termina suas operações. mapred.process é uma função que processa entidades individuais e atualiza contadores (é definido no arquivo mapred.py). ModeloCarr é definido em models.py

mapred.py:

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

Há uma definição ligeiramente alterada do modelo CarsByColor em comparação com a pergunt

Você pode iniciar o trabalho de mapreduce manualmente a partir de url:http: // yourapp / mapreduce / e espero que do cron (ainda não testei o cron

yourAnswerToTheQuestion