Кафка: не может создать многопотоковых потребителей

Я только начал работать с Kafka 0.8 beta 1. У меня есть очень простой пример, проблема в том, что я могу заставить работать только одного потребителя сообщений, а не нескольких. То есть метод runSingleWorker () работает. Метод run () НЕ РАБОТАЕТ:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.truecar.inventory.worker.core.application.config.AppConfig;

public class ConsumerThreadPool {

    private final ConsumerConnector consumer;
    private final String topic;

    private ExecutorService executor;
    private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

    public ConsumerThreadPool(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
        this.topic = topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(Integer numThreads) {
        Map topicCountMap = new HashMap();

        topicCountMap.put(topic, numThreads);
        Map consumerMap = consumer.createMessageStreams(topicCountMap);
        List topicListeners = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);

        for(Integer i = 0; i < numThreads; i++ ){
            KafkaStream stream =  topicListeners.get(i);
            executor.submit(new Consumer(stream, i));
        }
    }


    public void runSingleWorker(Integer numThreads) {
        Map topicCountMap = new HashMap();

        topicCountMap.put(topic, new Integer(1));

        Map consumerMap = consumer.createMessageStreams(topicCountMap);

        KafkaStream stream =  consumerMap.get(topic).get(0);
        ConsumerIterator it = stream.iterator();
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while(it.hasNext()){
                System.out.println(new String(it.next().message()));

            }
        }
    }
}

И внутри моего потребителя игрушек:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream kafkaStream;
    private Integer threadNumber;

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator it = kafkaStream.iterator();
        System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
        while(true) {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }

            while(it.hasNext()) {
                System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
            }
        }
        System.out.println("Shutting down Thread: " + threadNumber);
    }
}

Проблема в том, что пул рабочих не принимает сообщения:

Created iterator empty iterator thread number 3
Created iterator empty iterator thread number 6
Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 7
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 8
Created iterator empty iterator thread number 3
etc...

Когда я добавляю сообщения через командную строку продукта, сообщения печатаются в однопоточной рабочей версии, но сообщения не печатаются в многопотоковой ситуации. Что тут происходит? Как я могу это исправить?

Кстати, pom.xml для kafka 0.8 не является допустимым pom и не будет получать зависимости, поэтому здесь pom с полными зависимостями.



4.0.0
group1
artifact1
0.1.0
jar

    UTF-8
    3.2.4.RELEASE


    
        org.springframework
        spring-core
        3.2.4.RELEASE
    
    
        org.springframework
        spring-context
        3.2.4.RELEASE
    
    
        org.apache.kafka
        kafka_2.9.2
        0.8.0-beta1
    
    
        javax.inject
        javax.inject
        1
    
    
        org.scala-lang
        scala-library
        2.9.2
    
    
        log4j
        log4j
        1.2.17
    
    
        com.101tec
        zkclient
        0.3
    
    
        com.yammer.metrics
        metrics-core
        2.2.0
    


    inventory-core
    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.0
            
                1.7
                1.7
            
        
        
            org.apache.maven.plugins
            maven-jar-plugin
            
                
                    
                        com.truecar.inventory.worker.core.application.Starter
                    
                
            
        
        
            org.dstovall
            onejar-maven-plugin
            1.4.4
            
                
                    
                        0.97
                        onejar
                    
                    
                        one-jar
                    
                
            
        
    


    
        onejar-maven-plugin.googlecode.com
        http://onejar-maven-plugin.googlecode.com/svn/mavenrepo
    


Ответы на вопрос(1)

Ваш ответ на вопрос