Optimierung: JSON-Dump von einer Streaming-API nach Mongo

Hintergrund: Ich habe einpython Modul, das so eingerichtet ist, dass es JSON-Objekte von einer Streaming-API abruft und sie mithilfe von Pymongo in MongoDB speichert (Masseneinfügung von jeweils 25). Zum Vergleich habe ich auch einen bash Befehl dazucurl von der gleichen Streaming-API undpipe es zumongoimport. In beiden Ansätzen werden Daten in separaten Sammlungen gespeichert.

In regelmäßigen Abständen überwache ich diecount() der Sammlungen, um zu überprüfen, wie sie sich schlagen.

Soweit sehe ich daspython Modul um ca. 1000 JSON-Objekte hinter demcurl | mongoimport Ansatz.

Problem: Wie kann ich meine optimieren?python Modul, um ~ mit dem synchron zu seincurl | mongoimport?

Ich kann nicht benutzentweetstream da ich nicht die Twitter API benutze sondern einen 3rd Party Streaming Service.

Könnte mir bitte jemand hier raushelfen?

Python Modul:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Danke fürs Lesen.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage