Reiniciar Kafka Connect S3 Sink Task pierde posición, reescribe completamente todo

Después de reiniciar una tarea de sumidero de Kafka Connect S3, reinició la escritura desde el principio del tema y escribió copias duplicadas de registros anteriores. En otras palabras, Kafka Connect parecía perder su lugar.

Por lo tanto, me imagino que Kafka Connect almacena la información actual de posición de compensaciónconnect-offsets tema. Ese tema está vacío, lo que supongo que es parte del problema.

Los otros dos temas internos.connect-statuses yconnect-configs No están vacíos.connect-statuses tiene 52 entradas.connect-configs tiene 6 entradas; tres para cada uno de los dos conectores de sumidero que he configurado:connector-<name>, task-<name>-0, commit-<name>.

Creé manualmente los temas internos de Kafka Connect como se especifica en los documentos antes de ejecutar esto:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

Puedo verificar que elconnect-offsets El tema parece haber sido creado correctamente:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

Esto es con un clúster de tres servidores que ejecuta Confluent Platform v3.2.1 con Kafka 10.2.1.

Esconnect-offsets se supone que esta vacio? ¿Por qué si no se reinicia Kafka Connect al comienzo del tema al reiniciar una tarea?

ACTUALIZAR: Respuesta a la respuesta de Randall Hauch.

Explicación sobre las compensaciones del conector de origen frente a las compensaciones del conector de sumidero explica vacíoconnect-offsets. ¡Gracias por la explicación!Definitivamente no estoy cambiando el nombre del conector.Si el conector está inactivo durante ~ cinco días y se reinicia después, ¿hay alguna razón por la cual la posición de desplazamiento del conector caducaría y se restablecería? Veo__consumer_offsets tienecleanup.policy=compactauto.offset.reset solo debería tener efecto si no hay posición en__consumer_offsets, ¿derecho?

Estoy usando principalmente los valores predeterminados del sistema. My Jink config JSON es el siguiente. Estoy usando un particionador personalizado muy simple para particionar en un campo de fecha y hora de Avro en lugar del tiempo de reloj de pared. Esa característica parece haber sido agregada en Confluent v3.2.2 para que no necesite un complemento personalizado para esa funcionalidad. Espero omitir Confluent v3.2.2 e ir directamente a v3.3.0 cuando esté disponible.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

Respuestas a la pregunta(2)

Su respuesta a la pregunta