будет использовать нового потребителя в этом случае (но вы не можете видеть заголовки с консольным потребителем, в любом случае). Но я согласен, что странно, что потребитель может повлиять на производителя. Возможно, есть какая-то логика, которая гласит: «У нас есть старый потребитель, присоединенный к этой теме, поэтому вы не можете отправлять заголовки»
ользую эту настройку docker-compose для локальной настройки Kafka:https://github.com/wurstmeister/kafka-docker/
docker-compose up
работает отлично, создание тем через оболочку работает отлично.
Сейчас я пытаюсь подключиться к Кафке черезspring-kafka:2.1.0.RELEASE
При запуске приложения Spring оно печатает правильную версию Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
Я пытаюсь отправить сообщение, подобное этому
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Отправка на стороне клиента завершается неудачно с
UnknownServerException: The server experienced an unexpected error when processing the request
В консоли сервера я получаю сообщениеMagic v1 не поддерживает заголовки записей
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Поиск в Google предполагает конфликт версий, но версия кажется подходящей (org.apache.kafka:kafka-clients:1.0.0
находится в пути к классам).
Есть какие-нибудь подсказки? Спасибо!
Изменить: я сузил источник проблемы. Отправка простых строк работает, но отправка Json через JsonSerializer приводит к данной проблеме. Вот содержимое моего конфига производителя:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())