O streaming do Apache Flink no cluster não divide tarefas com trabalhadores
Meu objetivo é configurar um cluster de alto rendimento usando o Kafka como fonte e o Flink como o mecanismo de processamento de fluxo. Aqui está o que eu fiz.
Eu configurei um cluster de 2 nós com a seguinte configuração no mestre e no escravo.
Mestre flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Escravo flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
O arquivo escravos no nó Mestre se parece com isso:
<SLAVE_IP_ADDR>
localhost
A configuração do flink nos dois nós está em uma pasta com o mesmo nome. Inicio o cluster no mestre executando
bin/start-cluster-streaming.sh
Isso inicia o gerenciador de tarefas no nó escravo.
Minha fonte de entrada é Kafka. Aqui está o trecho.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Aqui está a minha função Sink
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
Aqui estão as dependências do Flink no meu pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
Então eu executo o jar empacotado com este comando no master
bin/flink run flink-test-jar-with-dependencies.jar
No entanto, quando insiro mensagens no tópico Kafka, sou capaz de responder a todas as mensagens provenientes do meu tópico Kafka (por meio de mensagens de depuração no método de chamada do meuSinkFunction
implementação) somente no nó Mestre.
Na interface do gerenciador de tarefas, eu consigo ver 2 gerenciadores de tarefas, como abaixo:
Por que os nós escravos não estão recebendo as tarefas?Estou perdendo alguma configuração?