Reiniciar a tarefa de pia do Kafka Connect S3 perde a posição, reescreve completamente tudo

Após reiniciar uma tarefa de coletor do Kafka Connect S3, ele reiniciou a gravação desde o início do tópico e gravou cópias duplicadas dos registros mais antigos. Em outras palavras, o Kafka Connect pareceu perder seu lugar.

Então, imagino que o Kafka Connect armazene as informações atuais da posição de deslocamento noconnect-offsets tema. Esse tópico está vazio e, presumo, faz parte do problema.

Os outros dois tópicos internosconnect-statuses econnect-configs não estão vazios.connect-statuses possui 52 entradas.connect-configs tem 6 entradas; três para cada um dos dois conectores de coletor que eu configurei:connector-<name>, task-<name>-0, commit-<name>.

Criei manualmente os tópicos internos do Kafka Connect, conforme especificado nos documentos antes de executar isso:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

Eu posso verificar se oconnect-offsets O tópico parece ter sido criado corretamente:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

Isso ocorre com um cluster de três servidores executando o Confluent Platform v3.2.1 executando o Kafka 10.2.1.

Éconnect-offsets deveria estar vazio? Por que outro motivo o Kafka Connect seria reiniciado no início do tópico ao reiniciar uma tarefa?

ATUALIZAR: Resposta à resposta de Randall Hauch.

Explicação sobre deslocamentos do conector de origem versus deslocamentos do conector do coletor explica vazioconnect-offsets. Obrigado pela explicação!Definitivamente, não estou mudando o nome do conector.Se o conector estiver inoperante por ~ cinco dias e reiniciado posteriormente, existe algum motivo para que a posição de deslocamento do conector expire e reinicie? Entendo__consumer_offsets temcleanup.policy=compactauto.offset.reset só deve ter efeito se não houver posição no__consumer_offsets, direito?

Estou usando principalmente padrões do sistema. Minha configuração JSON do Sink é a seguinte. Estou usando um particionador personalizado muito simples para particionar em um campo de data e hora do Avro, em vez do tempo do relógio de parede. Esse recurso parece ter sido adicionado no Confluent v3.2.2 para que eu não precise de um plug-in personalizado para essa funcionalidade. Espero pular o Confluent v3.2.2 e ir direto para a v3.3.0 quando estiver disponível.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

questionAnswers(2)

yourAnswerToTheQuestion