No escribir datos de compensación al cuidador del zoológico en kafka-storm

Estaba configurando un clúster de tormenta para calcular las tendencias en tiempo real y otras estadísticas, sin embargo, tengo algunos problemas al introducir la función de "recuperación" en este proyecto, al permitir el desplazamiento que fue leído por última vez por elkafka-spout (el código fuente dekafka-spout viene dehttps://github.com/apache/incubator-storm/tree/master/external/storm-kafka) ser recordado. Yo comienzo mikafka-spout De este modo:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

La configuración predeterminada debería hacer esto, pero creo que no lo está haciendo en mi caso, cada vez que inicio mi proyecto, elPartitionManager intenta buscar el archivo con las compensaciones, luego no se encuentra nada:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

Luego comienza a leer desde el último desplazamiento posible. Lo cual está bien si mi proyecto nunca falla, pero no exactamente lo que quería.

También miré un poco más en elPartitionManager clase que usaZkstate clase para escribir las compensaciones, desde este fragmento de código:

PartitionManeger

public void commit() {
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

ZkState

public void writeBytes(String path, byte[] bytes) {
    try {
        if (_curator.checkExists().forPath(path) == null) {
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
        } else {
            _curator.setData().forPath(path, bytes);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

Pude ver que para el primer mensaje, elwriteBytes método entra en elif bloquea e intenta crear una ruta, luego para el segundo mensaje va alelse bloque, que parece estar bien. Pero cuando comienzo el proyecto nuevamente, aparece el mismo mensaje mencionado anteriormente. Nopartition information puede ser encontrado.

Respuestas a la pregunta(2)

Su respuesta a la pregunta