Reiniciar a tarefa de pia do Kafka Connect S3 perde a posição, reescreve completamente tudo
Após reiniciar uma tarefa de coletor do Kafka Connect S3, ele reiniciou a gravação desde o início do tópico e gravou cópias duplicadas dos registros mais antigos. Em outras palavras, o Kafka Connect pareceu perder seu lugar.
Então, imagino que o Kafka Connect armazene as informações atuais da posição de deslocamento noconnect-offsets
tema. Esse tópico está vazio e, presumo, faz parte do problema.
Os outros dois tópicos internosconnect-statuses
econnect-configs
não estão vazios.connect-statuses
possui 52 entradas.connect-configs
tem 6 entradas; três para cada um dos dois conectores de coletor que eu configurei:connector-<name>
, task-<name>-0
, commit-<name>
.
Criei manualmente os tópicos internos do Kafka Connect, conforme especificado nos documentos antes de executar isso:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
Eu posso verificar se oconnect-offsets
O tópico parece ter sido criado corretamente:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
Isso ocorre com um cluster de três servidores executando o Confluent Platform v3.2.1 executando o Kafka 10.2.1.
Éconnect-offsets
deveria estar vazio? Por que outro motivo o Kafka Connect seria reiniciado no início do tópico ao reiniciar uma tarefa?
ATUALIZAR: Resposta à resposta de Randall Hauch.
Explicação sobre deslocamentos do conector de origem versus deslocamentos do conector do coletor explica vazioconnect-offsets
. Obrigado pela explicação!Definitivamente, não estou mudando o nome do conector.Se o conector estiver inoperante por ~ cinco dias e reiniciado posteriormente, existe algum motivo para que a posição de deslocamento do conector expire e reinicie? Entendo__consumer_offsets
temcleanup.policy=compact
auto.offset.reset
só deve ter efeito se não houver posição no__consumer_offsets
, direito?Estou usando principalmente padrões do sistema. Minha configuração JSON do Sink é a seguinte. Estou usando um particionador personalizado muito simples para particionar em um campo de data e hora do Avro, em vez do tempo do relógio de parede. Esse recurso parece ter sido adicionado no Confluent v3.2.2 para que eu não precise de um plug-in personalizado para essa funcionalidade. Espero pular o Confluent v3.2.2 e ir direto para a v3.3.0 quando estiver disponível.
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}