Что вы спасете мой день, удивительный ответ, можем ли мы установить customer.auto.offset.reset = как можно раньше для потребителя в kafka connect?

перезапуска задачи-приемника Kafka Connect S3 он возобновил запись с самого начала темы и записал дубликаты старых записей. Другими словами, Kafka Connect, казалось, потерял свое место.

Итак, я представляю, что Kafka Connect хранит информацию о текущем положении смещения во внутреннейconnect-offsets тема. Эта тема пуста, и я полагаю, это часть проблемы.

Две другие внутренние темыconnect-statuses а такжеconnect-configs не пустые.connect-statuses имеет 52 записи.connect-configs имеет 6 записей; три для каждого из двух коннекторов мойки:connector-<name>, task-<name>-0, commit-<name>.

Я вручную создал внутренние темы Kafka Connect, как указано в документации, перед запуском этого:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

Я могу убедиться, чтоconnect-offsets Кажется, тема создана правильно:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

Это с кластером из трех серверов с Confluent Platform v3.2.1 под управлением Kafka 10.2.1.

Являетсяconnect-offsets должен быть пустым? Зачем еще перезапуск Kafka Connect в начале темы при перезапуске задачи?

ОБНОВИТЬ: Ответ на ответ Рэндалла Хауха.

Объяснение относительно смещений разъема источника и смещения разъема приемника объясняется пустымconnect-offsets, Спасибо за объяснение!Я определенно не меняю имя разъема.Если разъем не работает в течение ~ пяти дней и после этого перезапускается, есть ли какая-либо причина, по которой положение смещения разъема истекает и сбрасывается? понятно__consumer_offsets имеетcleanup.policy=compactauto.offset.reset должен вступать в силу только если нет позиции в__consumer_offsets, правильно?

Я использую в основном системные настройки по умолчанию. Мой Sink-конфиг JSON выглядит следующим образом. Я использую очень простой пользовательский разделитель для разбиения на поле даты и времени Avro, а не время настенных часов. Эта функция, кажется, была добавлена ​​в Confluent v3.2.2, поэтому мне не понадобится специальный плагин для этой функции. Я надеюсь пропустить Confluent v3.2.2 и перейти сразу к v3.3.0, когда он станет доступен.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

Ответы на вопрос(1)

Ваш ответ на вопрос