Поток Apache Flink в кластере не разделяет работу с работниками
Моя цель - настроить кластер с высокой пропускной способностью, используя Kafka в качестве источника и Flink в качестве механизма обработки потоков. Вот что я сделал.
Я настроил кластер с 2 узлами следующую конфигурацию на ведущем и ведомом устройстве.
Мастер Флинк-конф.ямл
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Раб Flink-Conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Файл ведомых на главном узле выглядит так:
<SLAVE_IP_ADDR>
localhost
Настройка flink на обоих узлах находится в папке с одинаковым именем. Я запускаю кластер на мастер, запустив
bin/start-cluster-streaming.sh
Это запускает диспетчер задач на подчиненном узле.
Мой источник ввода - Кафка. Вот фрагмент.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Вот моя функция Мойка
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
Вот зависимости Flink в моем pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
Затем я запускаю упакованный jar с этой командой на мастер
bin/flink run flink-test-jar-with-dependencies.jar
Однако, когда я вставляю сообщения в тему Kafka, я могу учесть все сообщения, приходящие из моей темы Kafka (через отладочные сообщения в методе invoke моегоSinkFunction
реализация) только на главном узле.
В пользовательском интерфейсе диспетчера заданий я вижу 2 диспетчера задач, как показано ниже:
Почему подчиненные узлы не получают задачи?Я что-то пропустил?