Transmisión de resultados con Blaze y SqlAlchemy
Estoy tratando de usar Blaze / Odo para leer un conjunto de resultados grande (~ 70 millones de filas) de Redshift. Por defecto, SqlAlchemy intentará leer todo el resultado en la memoria, antes de comenzar a procesarlo. Esto puede ser prevenido por cualquieraexecution_options(stream_results=True)
en el motor / sesión oyield_per(sane_number)
en la consulta Cuando se trabaja desde Blaze SqlAchemy, las consultas se generan detrás de las cubiertas, dejando elexecution_options
acercarse a, aproximarse. Lamentablemente los siguientes lanzamientos y error.
from sqlalchemy import create_engine
from blaze import Data
redshift_params = (redshift_user, redshift_pass, redshift_endpoint, port, dbname)
engine_string = "redshift+psycopg2://%s:%s@%s:%d/%s" % redshift_params
engine = create_engine(engine_string,
execution_options=dict(stream_results=True)
)
db = Data(engine)
La excepción es:
...
/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.pyc in __buffer_rows(self)
1124 return
1125 size = getattr(self, '_bufsize', 1)
-> 1126 self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
1127 self._bufsize = self.size_growth.get(size, size)
1128 if self._max_row_buffer is not None:
InternalError: (psycopg2.InternalError) opening multiple cursors from within the same client connection is not allowed.
Si dejo fuera elexecution_options=dict(stream_results=True)
entonces lo anterior funciona, pero haciendo algo como
odo(db.mytable, 'mytable.bcolz')
se quedará sin memoria para tablas grandes.
Utilizandoexecution_options(stream_results=True)
funciona conpandas.read_csv
. El siguiente código funciona bien, usando solo cantidades moderadas de memoria:
from sqlalchemy import create_engine
import pandas as pd
redshift_params = (redshift_user, redshift_pass, redshift_endpoint, port, dbname)
engine_string = "postgresql+psycopg2://%s:%s@%s:%d/%s" % redshift_params
engine = create_engine(engine_string,
execution_options=dict(stream_results=True)
)
compression='bz2'
res = pd.read_sql_query(queryString
engine,
chunksize=2**20)
for i, df in enumerate(res):
df.to_csv('results-%s.csv.%s' % (i, compression), compression=compression)
Este es el seguimiento completo de la pila:
...
Data(engine)
No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/mahler/anaconda/lib/python2.7/site-packages/blaze/interactive.py", line 122, in Data
dshape = discover(data)
File "/home/mahler/anaconda/lib/python2.7/site-packages/multipledispatch/dispatcher.py", line 164, in __call__
return func(*args, **kwargs)
File "/home/mahler/anaconda/lib/python2.7/site-packages/odo/backends/sql.py", line 242, in discover
return discover(metadata)
File "/home/mahler/anaconda/lib/python2.7/site-packages/multipledispatch/dispatcher.py", line 164, in __call__
return func(*args, **kwargs)
File "/home/mahler/anaconda/lib/python2.7/site-packages/odo/backends/sql.py", line 248, in discover
metadata.reflect(views=metadata.bind.dialect.supports_views)
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/sql/schema.py", line 3623, in reflect
bind.dialect.get_view_names(conn, schema)
File "<string>", line 2, in get_view_names
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/reflection.py", line 42, in cache
return fn(self, con, *args, **kw)
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/dialects/postgresql/base.py", line 2347, in get_view_names
for row in connection.execute(s)]
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 713, in __iter__
row = self.fetchone()
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1026, in fetchone
self.cursor, self.context)
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
exc_info
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 200, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1017, in fetchone
row = self._fetchone_impl()
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1139, in _fetchone_impl
self.__buffer_rows()
File "/home/mahler/anaconda/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 1126, in __buffer_rows
self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.InternalError: (psycopg2.InternalError) opening multiple cursors from within the same client connection is not allowed.