будет использовать нового потребителя в этом случае (но вы не можете видеть заголовки с консольным потребителем, в любом случае). Но я согласен, что странно, что потребитель может повлиять на производителя. Возможно, есть какая-то логика, которая гласит: «У нас есть старый потребитель, присоединенный к этой теме, поэтому вы не можете отправлять заголовки»

ользую эту настройку docker-compose для локальной настройки Kafka:https://github.com/wurstmeister/kafka-docker/

docker-compose up работает отлично, создание тем через оболочку работает отлично.

Сейчас я пытаюсь подключиться к Кафке черезspring-kafka:2.1.0.RELEASE

При запуске приложения Spring оно печатает правильную версию Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

Я пытаюсь отправить сообщение, подобное этому

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Отправка на стороне клиента завершается неудачно с

UnknownServerException: The server experienced an unexpected error when processing the request

В консоли сервера я получаю сообщениеMagic v1 не поддерживает заголовки записей

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Поиск в Google предполагает конфликт версий, но версия кажется подходящей (org.apache.kafka:kafka-clients:1.0.0 находится в пути к классам).

Есть какие-нибудь подсказки? Спасибо!

Изменить: я сузил источник проблемы. Отправка простых строк работает, но отправка Json через JsonSerializer приводит к данной проблеме. Вот содержимое моего конфига производителя:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())
 Gary Russell23 дек. 2017 г., 18:18
Это не имеет смысла; (получение этого сообщения на стороне сервера). Если версия клиента была старше, она не отправляла бы никаких заголовков, поэтому все должно быть хорошо (я проверил, используя клиент 1.0.0 с брокером 0.10, и он работает, пока вы не пытаетесь отправлять заголовки). С клиентом 1.0.0 "пустой"RecordHeaders отправляется (клиентом), когда шаблон не отправляет заголовки.
 Hans Jespersen23 дек. 2017 г., 19:57
Кажется, что имена образов не указывают версию, поэтому вы можете использовать более старый кэшированный образ докера и более новый клиент. Клиент 1.0, отправляющий заголовки брокеру 0.10, получит эту ошибку. Попробуйте проверить версию образа докера, а докер извлечет новейший образ брокера 1.0.

Ответы на вопрос(3)

Решение Вопроса

не в некотором кеше Docker и не в приложении Spring.

Проблема была в консольном потребителе, который я использовал параллельно для отладки. Это был «старый» потребитель, начавший сkafka-console-consumer.sh --topic=topic --zookeeper=...

На самом деле он выводит предупреждение при запуске:Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

«Новый» потребитель с--bootstrap-server опция должна быть использована (особенно при использовании Kafka 1.0 с JsonSerializer). Примечание: использование старого потребителя здесь действительно может повлиять на производителя.

если мы используемJsonSerializer или жеJsonSerde для ценностей. Чтобы предотвратить эту проблему, нам нужно отключить добавление информационных заголовков.

если у вас все в порядке с сериализацией json по умолчанию, используйте следующее (ключевой момент здесьADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

но если вам нужен кастомJsonSerializer с конкретнымиObjectMapper (как сPropertyNamingStrategy.SNAKE_CASE), вы должны отключить добавление заголовков информацииJsonSerializer, как весенний кафка игнорируетDefaultKafkaProducerFactoryсобственностьADD_TYPE_INFO_HEADERS (как по мне это плохой дизайн весенней кафки)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

или если мы используемJsonSerde, тогда:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
 mojtab2302 июл. 2018 г., 17:06
Добавление этой строки решило мою проблемуprops.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

Я только что проверил этот образ докера без проблем ...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

РЕДАКТИРОВАТЬ

У меня все еще работает ...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz
 Gary Russell23 дек. 2017 г., 22:21
Странно - что еще страннее, если я разговариваю с сервером 0.10.2.0 из S-K 2.1.0 иJsonSerializerЯ получаю исключение наклиент как я и ожидалCaused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers, Как я уже говорил в своем первоначальном комментарии к вашему вопросу, старые брокеры ничего не знают о заголовках, поэтому я не вижу, как можно получить эту ошибку на сервере, если клиент не старый. Вы используете свой клиент на AWS? До этого я слышал некоторые странности с кодом на AWS, работающем по-старомуkafka-clients даже если приложение упаковано с правильным.
 Gary Russell23 дек. 2017 г., 21:48
У меня все еще работает - см. Мое редактирование.JsonSerializer добавляет заголовки по умолчанию, так что это определенно похоже на то, что брокер в вашем образе докера <0.11. Вы можете подтвердить это, установив свойство производителяJsonSerializer.ADD_TYPE_INFO_HEADERS вfalse, Это свойство не позволяет добавлять информацию о типе в заголовках.
 Gary Russell23 дек. 2017 г., 22:02
Пытатьсяdocker logs <containerId> > server.log в моем я вижу это:[2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser).
 Gary Russell23 дек. 2017 г., 23:33
Попробуйте использовать--bootstrap-server вариант вместо--zookeeper;console-consumer будет использовать нового потребителя в этом случае (но вы не можете видеть заголовки с консольным потребителем, в любом случае). Но я согласен, что странно, что потребитель может повлиять на производителя. Возможно, есть какая-то логика, которая гласит: «У нас есть старый потребитель, присоединенный к этой теме, поэтому вы не можете отправлять заголовки»
 DerM23 дек. 2017 г., 23:19
Итак .. еще одно обновление. Я думаю, что проблема не в брокере и не в приложении Spring. Я использовал консольный потребитель параллельно с приложением Spring для отладки (на основеэтот урок ). Я почти уверен, что проблема возникает при использовании "старого" потребителя (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK) с параметром zookeper вместо загрузочного сервера. Что мне кажется интересным, так это то, что этот потребитель приводит к исключению UnknownServerException у (Spring) производителя.

Ваш ответ на вопрос