будет использовать нового потребителя в этом случае (но вы не можете видеть заголовки с консольным потребителем, в любом случае). Но я согласен, что странно, что потребитель может повлиять на производителя. Возможно, есть какая-то логика, которая гласит: «У нас есть старый потребитель, присоединенный к этой теме, поэтому вы не можете отправлять заголовки»

ользую эту настройку docker-compose для локальной настройки Kafka:https://github.com/wurstmeister/kafka-docker/

docker-compose up работает отлично, создание тем через оболочку работает отлично.

Сейчас я пытаюсь подключиться к Кафке черезspring-kafka:2.1.0.RELEASE

При запуске приложения Spring оно печатает правильную версию Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

Я пытаюсь отправить сообщение, подобное этому

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Отправка на стороне клиента завершается неудачно с

UnknownServerException: The server experienced an unexpected error when processing the request

В консоли сервера я получаю сообщениеMagic v1 не поддерживает заголовки записей

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Поиск в Google предполагает конфликт версий, но версия кажется подходящей (org.apache.kafka:kafka-clients:1.0.0 находится в пути к классам).

Есть какие-нибудь подсказки? Спасибо!

Изменить: я сузил источник проблемы. Отправка простых строк работает, но отправка Json через JsonSerializer приводит к данной проблеме. Вот содержимое моего конфига производителя:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

Ответы на вопрос(3)

Ваш ответ на вопрос