Spring Kafka Producer no envía a Kafka 1.0.0 (Magic v1 no admite encabezados de registro)
Estoy usando esta configuración docker-compose para configurar Kafka localmente:https://github.com/wurstmeister/kafka-docker/
docker-compose up
funciona bien, crear temas a través de shell funciona bien.
Ahora trato de conectarme a Kafka a través despring-kafka:2.1.0.RELEASE
Al iniciar la aplicación Spring, imprime la versión correcta de Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
Intento enviar un mensaje como este
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
El envío del lado del cliente falla con
UnknownServerException: The server experienced an unexpected error when processing the request
En la consola del servidor recibo el mensajeMagic v1 no admite encabezados de registro
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Buscar en Google sugiere un conflicto de versión, pero la versión parece encajar (org.apache.kafka:kafka-clients:1.0.0
está en el classpath).
¿Alguna pista? ¡Gracias!
Editar: reduje la fuente del problema. Enviar cadenas simples funciona, pero enviar Json a través de JsonSerializer da como resultado el problema dado. Aquí está el contenido de mi configuración de productor:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())