Otimização: Dumping JSON de uma API de streaming para o Mongo

Fundo: eu tenho umpython módulo configurado para capturar objetos JSON de uma API de streaming e armazená-los (inserção em massa de 25 por vez) no MongoDB usando pymongo. Para comparação, eu também tenho um comando bash paracurl da mesma API de streaming epipe paramongoimport. Ambas essas abordagens armazenam dados em coleções separadas.

Periodicamente, monitoracount() das coleções para verificar como eles se saem.

Até agora, vejo opython módulo atrasado por cerca de 1000 objetos JSON atrás docurl | mongoimport abordagem.

Problema: Como posso otimizar minhapython módulo para ser ~ em sincronia com ocurl | mongoimport?

Não posso usartweetstream desde que eu não estou usando a API do Twitter, mas um serviço de streaming de terceiros.

Alguém poderia me ajudar aqui?

Python módulo:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Obrigado pela leitura.

questionAnswers(2)

yourAnswerToTheQuestion