? У меня точно такая же проблема, но не следите за тем, как это увеличивает параллелизм.

абота с потоковыми данными (2017-09-08_03_55_43-9675407418829265662) с помощьюApache Beam SDK for Java 2.1.0 не будет масштабироваться после 1 Worker даже с растущей очередью pubsub (сейчас 100 тыс. недоставленных сообщений) - есть ли у вас идеи почему?

В настоящее время работает сautoscalingAlgorithm=THROUGHPUT_BASED а такжеmaxNumWorkers=10.

Ответы на вопрос(2)

Решение Вопроса

ел, что она не масштабируется, потому что загрузка ЦП низка, а это означает, что что-то ограничивает производительность конвейера, например, внешнее регулирование. Масштабирование редко помогает в этих случаях.

Я вижу, что для обработки некоторых пакетов требуется несколько часов. Я рекомендую изучить логику вашего конвейера и посмотреть, есть ли другие части, которые можно оптимизировать.

 Brodin12 сент. 2017 г., 12:23
Вы связались с чем-то, что устарело - я должен переопределить это с моей собственной реализацией GroupBy / FlatMap?
 Scott Wegner12 сент. 2017 г., 17:51
Reshuffle transform помечен как устаревший, поскольку он является подсказкой для потока данных, а не концепцией модели Beam. Вы можете заново реализовать ту же логику самостоятельно, но Dataflow внутренне используетReshuffleTrigger для дополнительной оптимизации. ВозможноReshuffle преобразование может измениться или быть удалено в будущем, но не без замены аналогичным механизмом.
 Brodin11 сент. 2017 г., 14:25
Спасибо! Одним из шагов в конвейере является HTTP-запрос к другому сервису, который выполняет тяжелую обработку. Эта служба автоматически масштабируется и может обрабатывать более высокую нагрузку, чем текущая, однако швы потока данных ограничены. Превратив шаг потока данных в асинхронный HTTP-вызов, сможет ли он отправлять больше параллельных запросов?
 Raghu Angadi12 сент. 2017 г., 20:05
@ Бродин, пожалуйста, продолжайте и используйте перестановку. Хотя это помечено как устаревшее, это рекомендуемый способ перетасовать сообщения в потоке данных. Если и когда это будет удалено, будет простая замена для этого.
 Raghu Angadi11 сент. 2017 г., 19:31
Какую шкалу параллелизма вы ищете? С текущей настройкой вы уже получаете параллелизм около 4 * max_num_workers. DoFn может выдавать асинхронные вызовы, но более простым и гибким способом было бы добавить шаг «перестановки» [1]. Это позволяет вам легко контролировать максимальный параллельный запрос к вашему сервису (например, если вы используете random.nextInt (100) для тасования ключа, конвейер будет иметь до 100 параллельных запросов). [1]:beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/...

Вот чем я закончил:

import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import java.util.concurrent.ThreadLocalRandom;


public class ReshuffleWithRandomKey<T>
        extends PTransform<PCollection<T>, PCollection<T>> {

    private final int size;

    public ReshuffleWithRandomKey(int size) {
        this.size = size;
    }

    @Override
    public PCollection<T> expand(PCollection<T> input) {
        return input
                .apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
                .apply("Reshuffle", Reshuffle.<Integer, T>of())
                .apply("Values", Values.<T>create());
    }

    private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {

        private final int size;

        AssignRandomKeyFn(int size) {
            this.size = size;
        }

        @ProcessElement
        public void process(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
        }
    }
}

Как вы думаете, @ raghu-angadi и @ scott-wegner?

 Raghu Angadi13 сент. 2017 г., 23:02
Выглядит хорошо для меня. Незначительные вещи: вы можете использовать MapElement.via (), чтобы немного уменьшить код. Переименование «размера» во что-то «осколки» может лучше передать цель.
 David Adrian26 июн. 2018 г., 18:39
Вы подали заявкуReshuffleWithRandomKey непосредственно перед ParDo с блокирующим сетевым вызовом? Или вы встраивали ParDo междуReshuffle а такжеValues? У меня точно такая же проблема, но не следите за тем, как это увеличивает параллелизм.

Ваш ответ на вопрос