Optimización: Dumping JSON de una API de transmisión a Mongo

Fondo: tengo unpython módulo configurado para capturar objetos JSON de una API de transmisión y almacenarlos (inserción masiva de 25 a la vez) en MongoDB utilizando pymongo. Para comparación, también tengo un comando bash paracurl de la misma API de transmisión ypipe amongoimport. Ambos enfoques almacenan los datos en colecciones separadas.

Periódicamente, superviso elcount() De las colecciones para comprobar cómo les va.

Hasta ahora, veo elpython módulo retrasado por unos 1000 objetos JSON detrás de lacurl | mongoimport enfoque.

Problema: ¿Cómo puedo optimizar mipython módulo para estar ~ sincronizado con elcurl | mongoimport?

No puedo usartweetstream ya que no estoy usando la API de Twitter sino un servicio de transmisión de terceros.

¿Podría alguien ayudarme por favor aquí?

Python módulo:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Gracias por leer.

Respuestas a la pregunta(2)

Su respuesta a la pregunta