PySpark: lea, asigne y reduzca desde un archivo de texto de registro multilínea con newAPIHadoopFile
Estoy intentando resolver un problema que es similar aesta publicación. Mis datos originales son un archivo de texto que contiene valores (observaciones) de varios sensores. Cada observación se proporciona con una marca de tiempo, pero el nombre del sensor se proporciona solo una vez, y no en cada línea. Pero hay varios sensores en un archivo.
Time MHist::852-YF-007
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time MHist::852-YF-008
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Por lo tanto, quiero configurar Hadoop para dividir el archivo en las líneas donde se proporciona la información del sensor. Luego lea el nombre del sensor (por ejemplo, 852-YF-007 y 852-YF-008) de esas líneas y use MapReduce para leer los valores de cada sensor en consecuencia.
Lo hice en Python (Jupyter Notebook):
sheet = sc.newAPIHadoopFile(
'/user/me/sample.txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)
sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())
sf.take(50)
La salida es así:
[[u'::852-YF-007\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0'],
[u'::852-YF-008\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0']]
Mi pregunta es, cómo seguir procesando esto para extraer el nombre del sensor y tener las líneas de valor para ese sensor. Algo le gusta esto
852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines
Las líneas en sí se dividirán en marcas de tiempo y valores más adelante. Pero estoy más interesado en dividir los nombres de los sensores de las líneas.