Optimización: Dumping JSON de una API de transmisión a Mongo
Fondo: tengo unpython
módulo configurado para capturar objetos JSON de una API de transmisión y almacenarlos (inserción masiva de 25 a la vez) en MongoDB utilizando pymongo. Para comparación, también tengo un comando bash paracurl
de la misma API de transmisión ypipe
amongoimport
. Ambos enfoques almacenan los datos en colecciones separadas.
Periódicamente, superviso elcount()
De las colecciones para comprobar cómo les va.
Hasta ahora, veo elpython
módulo retrasado por unos 1000 objetos JSON detrás de lacurl | mongoimport
enfoque.
Problema: ¿Cómo puedo optimizar mipython
módulo para estar ~ sincronizado con elcurl | mongoimport
?
No puedo usartweetstream
ya que no estoy usando la API de Twitter sino un servicio de transmisión de terceros.
¿Podría alguien ayudarme por favor aquí?
Python
módulo:
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
Gracias por leer.