Kafka: Cant Create Multiple Stream Consumers

Właśnie uruchomiłem z Kafką 0.8 beta 1. Mam bardzo prosty przykład, działający, problem polega na tym, że mogę tylko jeden komunikat konsument do pracy, nie kilka. Oznacza to, że metoda runSingleWorker () DZIAŁA. Metoda run () NIE DZIAŁA:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.truecar.inventory.worker.core.application.config.AppConfig;

public class ConsumerThreadPool {

  private final ConsumerConnector consumer;
  private final String topic;

  private ExecutorService executor;
  private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

  public ConsumerThreadPool(String topic) {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
    this.topic = topic;
  }

  public void shutdown() {
    if (consumer != null) consumer.shutdown();
    if (executor != null) executor.shutdown();
  }

  public void run(Integer numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    topicCountMap.put(topic, numThreads);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> topicListeners = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(numThreads);

    for(Integer i = 0; i < numThreads; i++ ){
      KafkaStream<byte[], byte[]> stream = topicListeners.get(i);
      executor.submit(new Consumer(stream, i));
    }
  }


  public void runSingleWorker(Integer numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(true) {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      while(it.hasNext()){
        System.out.println(new String(it.next().message()));

      }
    }
  }
}

I wewnątrz mojego konsumenta zabawek:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

  private KafkaStream kafkaStream;
  private Integer threadNumber;

  public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
    this.threadNumber = threadNumber;
    this.kafkaStream = kafkaStream;
  }

  public void run() {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
    while(true) {

      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        break;
      }

      while(it.hasNext()) {
        System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
      }
    }
    System.out.println("Shutting down Thread: " + threadNumber);
  }
}

Problem polega na tym, że pula pracowników nie odbiera wiadomości:

Created iterator empty iterator thread number 3
Created iterator empty iterator thread number 6
Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 7
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 8
Created iterator empty iterator thread number 3
etc...

Gdy dodaję wiadomości za pomocą wiersza poleceń produkcji, wiadomości są drukowane w wersji z pojedynczym wątkiem, ale wiadomości nie są drukowane w sytuacji wielu strumieni. Co tu się dzieje? Jak mogę to naprawić?

Btw, pom.xml dla kafki 0.8 nie jest poprawnym pom i nie uzyska zależności, więc tutaj jest pom z kompletnymi zależnościami.

<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
  http://maven.apache.org/POM/4.0.0
  http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>group1</groupId>
<artifactId>artifact1</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <org.springframework.version>3.2.4.RELEASE</org.springframework.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>3.2.4.RELEASE</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>3.2.4.RELEASE</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.0-beta1</version>
  </dependency>
  <dependency>
    <groupId>javax.inject</groupId>
    <artifactId>javax.inject</artifactId>
    <version>1</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.9.2</version>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
  <dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.3</version>
  </dependency>
  <dependency>
    <groupId>com.yammer.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>2.2.0</version>
  </dependency>
</dependencies>
<build>
  <finalName>inventory-core</finalName>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.0</version>
      <configuration>
        <source>1.7</source>
        <target>1.7</target>
      </configuration>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-jar-plugin</artifactId>
      <configuration>
        <archive>
          <manifest>
            <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass>
          </manifest>
        </archive>
      </configuration>
    </plugin>
    <plugin>
      <groupId>org.dstovall</groupId>
      <artifactId>onejar-maven-plugin</artifactId>
      <version>1.4.4</version>
      <executions>
        <execution>
          <configuration>
            <onejarVersion>0.97</onejarVersion>
            <classifier>onejar</classifier>
          </configuration>
          <goals>
            <goal>one-jar</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
<pluginRepositories>
  <pluginRepository>
    <id>onejar-maven-plugin.googlecode.com</id>
    <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
  </pluginRepository>
</pluginRepositories>
</project>

questionAnswers(1)

yourAnswerToTheQuestion