Qual é a maneira mais eficiente de fazer uma redução classificada no PySpark?
Estou analisando os registros de desempenho pontual dos voos domésticos nos EUA a partir de 2015. Preciso agrupar por número da cauda e armazenar uma lista ordenada por data de todos os voos para cada número da cauda em um banco de dados, a serem recuperados pelo meu aplicativo. Não tenho certeza de qual das duas opções para alcançar essa é a melhor.
# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x:
(x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
Eu posso conseguir isso de uma forma reduzida ...
# Group flights by tail number, sorted by date, then flight number, then
origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))
Ou posso consegui-lo em um trabalho de mapa subsequente ...
# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tuple:
(
tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
)
Fazer isso na redução parece realmente ineficiente, mas na verdade ambos são muito lentos. classificado () parece com a maneira de fazer isso nos documentos do PySpark, então eu estou querendo saber se o PySpark não faz isso kosher internamente? Qual opção é a mais eficiente ou a melhor opção por algum outro motivo?
Meu código também está em uma essência aqui:https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade
Se você está curioso sobre os dados, é do Bureau of Transportation Statistics, aqui:http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time