Что вы спасете мой день, удивительный ответ, можем ли мы установить customer.auto.offset.reset = как можно раньше для потребителя в kafka connect?
перезапуска задачи-приемника Kafka Connect S3 он возобновил запись с самого начала темы и записал дубликаты старых записей. Другими словами, Kafka Connect, казалось, потерял свое место.
Итак, я представляю, что Kafka Connect хранит информацию о текущем положении смещения во внутреннейconnect-offsets
тема. Эта тема пуста, и я полагаю, это часть проблемы.
Две другие внутренние темыconnect-statuses
а такжеconnect-configs
не пустые.connect-statuses
имеет 52 записи.connect-configs
имеет 6 записей; три для каждого из двух коннекторов мойки:connector-<name>
, task-<name>-0
, commit-<name>
.
Я вручную создал внутренние темы Kafka Connect, как указано в документации, перед запуском этого:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
Я могу убедиться, чтоconnect-offsets
Кажется, тема создана правильно:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
Это с кластером из трех серверов с Confluent Platform v3.2.1 под управлением Kafka 10.2.1.
Являетсяconnect-offsets
должен быть пустым? Зачем еще перезапуск Kafka Connect в начале темы при перезапуске задачи?
ОБНОВИТЬ: Ответ на ответ Рэндалла Хауха.
Объяснение относительно смещений разъема источника и смещения разъема приемника объясняется пустымconnect-offsets
, Спасибо за объяснение!Я определенно не меняю имя разъема.Если разъем не работает в течение ~ пяти дней и после этого перезапускается, есть ли какая-либо причина, по которой положение смещения разъема истекает и сбрасывается? понятно__consumer_offsets
имеетcleanup.policy=compact
auto.offset.reset
должен вступать в силу только если нет позиции в__consumer_offsets
, правильно?Я использую в основном системные настройки по умолчанию. Мой Sink-конфиг JSON выглядит следующим образом. Я использую очень простой пользовательский разделитель для разбиения на поле даты и времени Avro, а не время настенных часов. Эта функция, кажется, была добавлена в Confluent v3.2.2, поэтому мне не понадобится специальный плагин для этой функции. Я надеюсь пропустить Confluent v3.2.2 и перейти сразу к v3.3.0, когда он станет доступен.
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}