Как загрузить данные порциями из фрейма данных pandas в фрейм искры

Я прочитал данные порциями по соединению pyodbc, используя что-то вроде этого:

import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)

Теперь я хочу прочитать все эти куски в одном кадре данных spark, используя что-то вроде:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

Проблема в том, когда я делаюdf2.count() я получаю результат как 10, что означает, что работает только случай i = 0. Это ошибка с unionAll. Я делаю что-то не так?

Ответы на вопрос(1)

Решение Вопроса

Документация для.unionAll() утверждает, что он возвращает новый фрейм данных, так что вам придется назначить обратноdf2 DataFrame:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

Кроме того, вы можете вместо этого использоватьenumerate() чтобы избежать необходимости управлятьi переменная себя:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))

Кроме того, документация для.unionAll() говорится, что.unionAll() устарела и теперь вы должны использовать.union() который действует как UNION ALL в SQL:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.union(sqlContext.createDataFrame(chunk))

Редактировать:
Более того, я перестану говорить дальше, но не раньше, чем я скажу больше: как говорит @ zero323, давайте не будем использовать.union() в петле. Давайте вместо этого сделаем что-то вроде:

def unionAll(*dfs):
    ' by @zero323 from here: http://stackoverflow.com/a/33744540/42346 '
    first, *rest = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

df_list = []
for chunk in df1:
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = unionAll(df_list)
 zero32331 июл. 2016 г., 02:07
Кроме того, вы должны подумать дважды, прежде чем использоватьunion(All) в цикле :) Подсказка: есть такая вещь, как родословная.
 Gaurav Dhama31 июл. 2016 г., 01:52
Это сработало. Спасибо!!
 Gaurav Dhama31 июл. 2016 г., 11:55
Но, создав список df_list для всех чанков, вы не загрузите все чанки в память одновременно. Чанки в основном используются для того, чтобы мы могли загружать стол небольшими частями
 zero32331 июл. 2016 г., 02:13
Это будет работать только с RDD, поэтому вам понадобится немного больше кода. Вы также можете обрезать, но это ужасно в Python.
 bernie31 июл. 2016 г., 02:07
Кроме того, спасибо за комментарий!
 bernie31 июл. 2016 г., 02:16
@ zero323: пожалуйста, смотрите редактирование и дайте мне знать, что вы думаете. Благодарю.
 bernie31 июл. 2016 г., 02:10
@ zero323: я предложу, чтобы ОП создал список, а затем передал этот список.union()...
 bernie31 июл. 2016 г., 02:24
@ zero323: спасибо! Я включил ваш код - с указанием авторства и upvote - в мой ответ.
 zero32331 июл. 2016 г., 02:28
Отлично. Выглядит хорошо для меня.
 zero32331 июл. 2016 г., 02:21
Я уверен, что нет версии Varargsunion, Вы можете проверить последний фрагмент вstackoverflow.com/a/33744540/1560062 который все еще неоптимален, но не использует закрытый API. Также есть этоgithub.com/high-performance-spark/...

Ваш ответ на вопрос