Estendendo o TableInputFormat do Hadoop para digitalizar com um prefixo usado para distribuição de chaves de registro de data e hora

Eu tenho uma tabela hbase cuja chave é um timestamp com um prefixo aleatório de um byte para distribuir as chaves para que as varreduras não fiquem ativas. Estou tentando estenderTableInputFormat para que eu possa executar um único MapReduce na tabela com um intervalo, prefixando todos os 256 prefixos possíveis para que todos os intervalos com o intervalo de carimbo de data e hora especificado sejam verificados. Minha solução não está funcionando, pois parece sempre verificar o último prefixo (127) 256 vezes. Algo deve ser compartilhado em todas as varreduras.

Meu código está abaixo. Alguma ideia?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

e

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);

questionAnswers(1)

yourAnswerToTheQuestion