PySpark: lea, asigne y reduzca desde un archivo de texto de registro multilínea con newAPIHadoopFile

Estoy intentando resolver un problema que es similar aesta publicación. Mis datos originales son un archivo de texto que contiene valores (observaciones) de varios sensores. Cada observación se proporciona con una marca de tiempo, pero el nombre del sensor se proporciona solo una vez, y no en cada línea. Pero hay varios sensores en un archivo.

Time    MHist::852-YF-007   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time    MHist::852-YF-008   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0

Por lo tanto, quiero configurar Hadoop para dividir el archivo en las líneas donde se proporciona la información del sensor. Luego lea el nombre del sensor (por ejemplo, 852-YF-007 y 852-YF-008) de esas líneas y use MapReduce para leer los valores de cada sensor en consecuencia.

Lo hice en Python (Jupyter Notebook):

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)

sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())

sf.take(50)

La salida es así:

[[u'::852-YF-007\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0'],
 [u'::852-YF-008\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0']]

Mi pregunta es, cómo seguir procesando esto para extraer el nombre del sensor y tener las líneas de valor para ese sensor. Algo le gusta esto

852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines

Las líneas en sí se dividirán en marcas de tiempo y valores más adelante. Pero estoy más interesado en dividir los nombres de los sensores de las líneas.

Respuestas a la pregunta(1)

Su respuesta a la pregunta