¿Cómo se transpone un marco de datos dask (convertir columnas en filas) para abordar los principios de datos ordenados

TLDR: Creé un marco de datos dask de una bolsa dask. El marco de datos dask trata cada observación (evento) como una columna. Entonces, en lugar de tener filas de datos para cada evento, tengo una columna para cada evento. El objetivo es transponer las columnas a filas de la misma manera que los pandas pueden transponer un marco de datos usando df.T.

Detalles: Yo tengomuestra de datos de Twitter de mi línea de tiempo aquí. Para llegar a mi punto de partida, aquí está el código para leer un json del disco en undask.bag y luego convertir eso en undask.dataframe

import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()

El problema Todos mis eventos individuales (es decir, tweets) se registran como columnas y vice filas. En acuerdo contidy principios, me gustaría tener filas para cada evento.pandas tiene un método de transposición para marcos de datos y dask.array tiene un método de transposición para matrices. Mi objetivo es hacer la misma operación de transposición, pero en un marco de datos discontinuo. ¿Como podría hacerlo?

Convertir filas en columnasEditar para solución

Este código resuelve el problema de transposición original, limpia los archivos json de Twitter definiendo las columnas que desea conservar y eliminando el resto, y crea una nueva columna aplicando una función a una Serie. Luego, escribimos un archivo MUCHO más pequeño y limpio en el disco.

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)

Respuestas a la pregunta(1)

Su respuesta a la pregunta