Мне пришлось добавить эту строку непосредственно перед вызовом опроса, кажется, что теперь все работает: kafkaConsumer.seekToBeginning (kafkaConsumer.assignment ())

денный ниже клиент Scala kafka не возвращает никаких событий изpoll вызов.

Тем не менее, тема правильная, и я вижу события, отправляемые в тему с помощью консоли:

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning

Я также вижу тему в моем примере кода Scala ниже, когда я прохожу через него с помощью отладчика и вызываюkafkaConsumer.listTopics()

Кроме того, это вызывается из одного модульного теста, так что я создаю только один экземпляр этой черты и потребителя (то есть другой экземпляр потребителя не может потреблять сообщения). Я также использую случайный group_id.

Что-то не так с приведенным ниже кодом / конфигурацией?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.util.Random

trait KafkaTest {

  val kafkaConsumerProperties = new Properties()

  kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

  kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

  kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

  kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

  def checkKafkaHasReceivedEvent(): Assertion = {

    val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
    ...
  }
}

Увеличение таймаута опроса также не помогает.

Ответы на вопрос(1)

Ваш ответ на вопрос