Wie transponieren Sie einen Dask-Datenrahmen (konvertieren Sie Spalten in Zeilen), um ordentliche Datenprinzipien zu erreichen

TLDR: Ich habe einen Dask-Datenrahmen aus einer Dask-Tasche erstellt. Der Dask-Datenrahmen behandelt jede Beobachtung (Ereignis) als Spalte. Anstatt also Datenzeilen für jedes Ereignis zu haben, habe ich für jedes Ereignis eine Spalte. Das Ziel ist es, die Spalten auf die gleiche Weise in Zeilen zu transponieren, wie Pandas mit df.T. @ einen Datenrahmen transponieren könne

Einzelheite: Ich habesample twitter Daten von meiner Timeline hier. Um zu meinem Ausgangspunkt zu gelangen, ist hier der Code zum Lesen eines JSON von der Festplatte in eindask.bag und konvertiere das dann in eindask.dataframe

import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()

Das Proble Alle meine individuellen Ereignisse (d. H. Tweets) werden als Spalten oder Zeilen aufgezeichnet. In Übereinstimmung mittidyach @ -Prinzipien hätte ich gerne Zeilen für jedes Ereignis.pandas hat eine Transponierungsmethode für Datenrahmen und dask.array haben eine Transponierungsmethode für Arrays. Mein Ziel ist es, die gleiche Transponierungsoperation durchzuführen, jedoch auf einem Dask-Datenrahmen. Wie würde ich das machen?

Zeilen in Spalten konvertierenEdit für Lösung

Dieser Code behebt das ursprüngliche Transponierungsproblem, bereinigt Twitter-JSON-Dateien, indem Sie die Spalten definieren, die Sie beibehalten möchten, und löscht den Rest. Durch Anwenden einer Funktion auf eine Serie wird eine neue Spalte erstellt. Dann schreiben wir eine VIEL kleinere, bereinigte Datei auf die Festplatte.

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)

Antworten auf die Frage(2)

Ihre Antwort auf die Frage