Кафка: не может создать многопотоковых потребителей
Я только начал работать с Kafka 0.8 beta 1. У меня есть очень простой пример, проблема в том, что я могу заставить работать только одного потребителя сообщений, а не нескольких. То есть метод runSingleWorker () работает. Метод run () НЕ РАБОТАЕТ:
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import com.truecar.inventory.worker.core.application.config.AppConfig;
public class ConsumerThreadPool {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
public ConsumerThreadPool(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
this.topic = topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(Integer numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, numThreads);
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List topicListeners = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(numThreads);
for(Integer i = 0; i < numThreads; i++ ){
KafkaStream stream = topicListeners.get(i);
executor.submit(new Consumer(stream, i));
}
}
public void runSingleWorker(Integer numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
}
}
И внутри моего потребителя игрушек:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream kafkaStream;
private Integer threadNumber;
public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
}
public void run() {
ConsumerIterator it = kafkaStream.iterator();
System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
while(it.hasNext()) {
System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
}
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}
Проблема в том, что пул рабочих не принимает сообщения:
Created iterator empty iterator thread number 3
Created iterator empty iterator thread number 6
Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 7
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 8
Created iterator empty iterator thread number 3
etc...
Когда я добавляю сообщения через командную строку продукта, сообщения печатаются в однопоточной рабочей версии, но сообщения не печатаются в многопотоковой ситуации. Что тут происходит? Как я могу это исправить?
Кстати, pom.xml для kafka 0.8 не является допустимым pom и не будет получать зависимости, поэтому здесь pom с полными зависимостями.
4.0.0
group1
artifact1
0.1.0
jar
UTF-8
3.2.4.RELEASE
org.springframework
spring-core
3.2.4.RELEASE
org.springframework
spring-context
3.2.4.RELEASE
org.apache.kafka
kafka_2.9.2
0.8.0-beta1
javax.inject
javax.inject
1
org.scala-lang
scala-library
2.9.2
log4j
log4j
1.2.17
com.101tec
zkclient
0.3
com.yammer.metrics
metrics-core
2.2.0
inventory-core
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.7
1.7
org.apache.maven.plugins
maven-jar-plugin
com.truecar.inventory.worker.core.application.Starter
org.dstovall
onejar-maven-plugin
1.4.4
0.97
onejar
one-jar
onejar-maven-plugin.googlecode.com
http://onejar-maven-plugin.googlecode.com/svn/mavenrepo