Extensión del TableInputFormat de Hadoop para escanear con un prefijo utilizado para la distribución de claves de marca de tiempo

Tengo una tabla hbase cuya clave es una marca de tiempo con un prefijo aleatorio de un byte para distribuir las claves para que los escaneos no tengan un punto de acceso. Estoy tratando de extenderTableInputFormat de modo que pueda ejecutar un solo MapReduce en la tabla con un rango, prefijando los 256 prefijos posibles para que se escaneen todos los rangos con el rango de marca de tiempo especificado. Sin embargo, mi solución no funciona, ya que siempre parece escanear el último prefijo (127) 256 veces. Algo debe ser compartido en todos los escaneos.

Mi código está abajo. ¿Algunas ideas?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

y

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);

Respuestas a la pregunta(1)

Su respuesta a la pregunta