Spring Kafka Producer no envía a Kafka 1.0.0 (Magic v1 no admite encabezados de registro)

Estoy usando esta configuración docker-compose para configurar Kafka localmente:https://github.com/wurstmeister/kafka-docker/

docker-compose up funciona bien, crear temas a través de shell funciona bien.

Ahora trato de conectarme a Kafka a través despring-kafka:2.1.0.RELEASE

Al iniciar la aplicación Spring, imprime la versión correcta de Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

Intento enviar un mensaje como este

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

El envío del lado del cliente falla con

UnknownServerException: The server experienced an unexpected error when processing the request

En la consola del servidor recibo el mensajeMagic v1 no admite encabezados de registro

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Buscar en Google sugiere un conflicto de versión, pero la versión parece encajar (org.apache.kafka:kafka-clients:1.0.0 está en el classpath).

¿Alguna pista? ¡Gracias!

Editar: reduje la fuente del problema. Enviar cadenas simples funciona, pero enviar Json a través de JsonSerializer da como resultado el problema dado. Aquí está el contenido de mi configuración de productor:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

Respuestas a la pregunta(3)

Su respuesta a la pregunta