Optimierung: JSON-Dump von einer Streaming-API nach Mongo
Hintergrund: Ich habe einpython
Modul, das so eingerichtet ist, dass es JSON-Objekte von einer Streaming-API abruft und sie mithilfe von Pymongo in MongoDB speichert (Masseneinfügung von jeweils 25). Zum Vergleich habe ich auch einen bash Befehl dazucurl
von der gleichen Streaming-API undpipe
es zumongoimport
. In beiden Ansätzen werden Daten in separaten Sammlungen gespeichert.
In regelmäßigen Abständen überwache ich diecount()
der Sammlungen, um zu überprüfen, wie sie sich schlagen.
Soweit sehe ich daspython
Modul um ca. 1000 JSON-Objekte hinter demcurl | mongoimport
Ansatz.
Problem: Wie kann ich meine optimieren?python
Modul, um ~ mit dem synchron zu seincurl | mongoimport
?
Ich kann nicht benutzentweetstream
da ich nicht die Twitter API benutze sondern einen 3rd Party Streaming Service.
Könnte mir bitte jemand hier raushelfen?
Python
Modul:
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
Danke fürs Lesen.