Это должен быть принятый ответ @battilanast.

настоящее время я работаю над своей диссертацией бакалавра, и в основном моя задача состоит в том, чтобы оптимизировать данный код в Go, то есть сделать его максимально быстрым. Сначала я оптимизировал последовательную функцию, а затем попытался ввести параллелизм с помощью процедур. После исследования в Интернете я теперь понимаю разницу между параллелизмом и параллелизмом благодаря следующим слайдам из

talks.golang, Я посетил несколько курсов параллельного программирования, где мы распараллеливали код на языке c / c ++ с помощью pthread / openmp, поэтому я попытался применить эти парадигмы в Go. Тем не менее, в этом конкретном случае я оптимизирую функцию, которая вычисляетскользящее среднее среза с длиной (равно 9393 или 10175), следовательно, мы имеемlen:=n+(window_size-1) окна, из которых мы вычисляем соответствующее среднее арифметическое и сохраняем это должным образом в выходном срезе.nОбратите внимание, что эта задача по своей сути смущает параллель.

Мои попытки оптимизации и результаты

В

 Я разделил ломтик наmoving_avg_concurrent2 меньшие кусочки и побежал каждый с одним горутин. Эта функция выполнялась с одной процедурой, по какой-то причине (пока не удалось выяснить, почему, но мы становимся касательной), лучше, чемnum_goroutines но с более чем одним goroutine он начал работать хуже, чемmoving_avg_serial4Вmoving_avg_serial4.
 Я принял парадигму мастер / работник. Производительность была хуже чемmoving_avg_concurrent3 при использовании одного горутина. Здесь у нас по крайней мере я получил лучшую производительность при увеличенииmoving_avg_serial4 но все же не лучше чемnum_goroutines, Чтобы сравнить выступленияmoving_avg_serial4 а такжеmoving_avg_serial4, moving_avg_concurrent2 Я написал тест и составил таблицу результатов:moving_avg_concurrent3Вопрос

fct & num_goroutines | timing in ns/op | percentage  
---------------------------------------------------------------------   
          serial4    |         4357893 |   100.00%  
          concur2_1  |         5174818 |   118.75%  
          concur2_4  |         9986386 |   229.16%  
          concur2_8  |        18973443 |   435.38%  
          concur2_32 |        75602438 |  1734.84%  
          concur3_1  |        32423150 |   744.01%  
          concur3_4  |        21083897 |   483.81%  
          concur3_8  |        16427430 |   376.96%  
          concur3_32 |        15157314 |   347.81%  
Поскольку, как упоминалось выше, эта проблема смущающе параллельна, я ожидал увидеть огромное увеличение производительности, но это не имело место.

Почему

 не масштабируется вообще?moving_avg_concurrent2И почему
 намного медленнее, чемmoving_avg_concurrent3Я знаю, что подпрограммы дешевы, но все еще не бесплатны, но возможно ли, что это порождает столько накладных расходов, что мы даже медленнее, чемmoving_avg_serial4?
Кодmoving_avg_serial4?

Функции:

тесты:

// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i, val := range input {
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = val
            if !NaN_in_slice(buffer) && first_time {
                sum := 0.0
                for _, entry := range buffer {
                    sum += entry
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
                output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i := range input {
            //            fmt.Printf("in mvg_avg4: i=%v\n", i)
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
            if first_time && !NaN_in_slice(buffer) {
                sum := 0.0
                for j := range buffer {
                    sum += buffer[j]
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
                output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN's in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_items := len(input) - (window_size - 1)
        var barrier_wg sync.WaitGroup
        n := num_items / num_goroutines
        go_avg := make([][]float64, num_goroutines)
        for i := 0; i < num_goroutines; i++ {
            go_avg[i] = make([]float64, 0, num_goroutines)
        }

        for i := 0; i < num_goroutines; i++ {
            barrier_wg.Add(1)
            go func(go_id int) {
                defer barrier_wg.Done()

                // computing boundaries
                var start, stop int
                start = go_id*int(n) + (window_size - 1) // starting index
                // ending index
                if go_id != (num_goroutines - 1) {
                    stop = start + n // Ending index
                } else {
                    stop = num_items + (window_size - 1) // Ending index
                }

                loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

                loc_avg = make([]float64, stop-start)
                current_sum := 0.0
                for i := start - (window_size - 1); i < start+1; i++ {
                    current_sum += input[i]
                }
                loc_avg[0] = current_sum / float64(window_size)
                idx := 1

                for i := start + 1; i < stop; i++ {
                    loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
                    idx++
                }

                go_avg[go_id] = append(go_avg[go_id], loc_avg...)

            }(i)
        }
        barrier_wg.Wait()

        for i := 0; i < num_goroutines; i++ {
            output = append(output, go_avg[i]...)
        }

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
    sum := 0.0
    size := end - start
    for _, val := range input[start:end] {
        sum += val
    }
    output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_windows := len(input) - (window_size - 1)
        var output = make([]float64, len(input))
        for i := 0; i < window_size-1; i++ {
            output[i] = math.NaN()
        }

        pending := make(chan *Work)
        done := make(chan *Work)

        // creating work
        go func() {
            for i := 0; i < num_windows; i++ {
                pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
            }
        }()

        // start goroutines which work through pending till there is nothing left
        for i := 0; i < num_goroutines; i++ {
            go func() {
                Worker(pending, done)
            }()
        }

        // wait till every work is done
        for i := 0; i < num_windows; i++ {
            <-done
        }

        return output

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

Примечания:

//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res17 = r
}



func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
    benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
    benchmarkMoving_avg_serial4(b, 261*10)
}


func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}


func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############

Это мой самый первый пост, который я все еще изучаю, поэтому любая конструктивная критика также приветствуется.
а) Код не является оптимальным, есть оптимизация глазного яблока, даже без профилирования (например, вы запускаете последовательные циклы

 Adrian07 сент. 2017 г., 18:25
 это может быть объединено в одну). б) Вы делали какие-либо профилирование?for i := 0; i < num_goroutines; i++ является отличным инструментом и может дать некоторое представление о том, где именно происходит замедление. По крайней мере, предоставление результатов теста и результатов может помочь сообществу помочь вам.pprof@ Адриан, а) точка взята. б) Я выполнил профилирование процессора всей моей программы и сохранил PDF получившегося ориентированного графа, фактически именно поэтому я сейчас работаю над этой конкретной функцией. Не могли бы вы уточнить, как я должен предоставить контрольный показатель (поскольку вышеприведенного кажется недостаточным) и контрольные результаты?
 battilanast07 сент. 2017 г., 18:37
Вы уверены, что обработка очень
 reticentroot07 сент. 2017 г., 22:24
 user366619707 сент. 2017 г., 18:39
Скользящее среднее, используя любое количество немой силытолько на основе убеждения«... что эта задача по своей сути смущает параллель». даст более быстрые результаты, чем умная сила, использующая достаточно правильную, математически очень эффективную потоковую обработку временных рядов? Вы проверяли это на одном MCVE- {-code | -dataset}, чтобыустановить реалистичный тест производительности MA ()? (Мой вопрос основан на опыте обработки в реальном времени в квантовом / финском технологиях, который длится около двух десятилетий, когда случаются сильные штормы событий [мс] и должная забота о производительности является обязательной)en.wikipedia.org/wiki/Amdahl%27s_law
 user366619708 сент. 2017 г., 21:47
 StackOverflow призывает представитьПостановка проблемы на основе MCVEПозволяет провести повторное тестирование. Декларативные (голые) заявления здесь не имеют значения. Вы можете перечитать о том, как опубликоватьinimumMompleteCerifiableVxamples (добавление кода повторного запуска теста с обоими периодами MA (будучи?E?8200 ) и DataSET (может использовать синтетический DataSET, так потому что значения здесь не важны, но, очевидно, тот же размер + тип DataSET.aTimeSerieDATA = make( []float64, 10000, 10000 )Следующий: имея общий повторяемый тест, можно улучшить производительностьФакт № 0: Предварительно зрелые усилия по оптимизации часто имеют отрицательную доходность

Ответы на вопрос(1)

Решение Вопроса
показать, что это просто пустая трата времени и усилий
Почему?

Один
"неправильно" SLOC может опустошить производительность более чем до+ 37%или может улучшить производительность
 тратитьменее -57% базового времени обработкиПочему

51..017µs on MA(200) [10000]int
70.325µs on MA(200) [10000]float64

-s?[]intВы видите это сами по себе выше - это хлеб с маслом для эффективных стратегий обработки HPC / fintech sub [us] (и мы все еще говорим с точки зрения просто
 планирование процесса).[SERIAL]Это можно проверить в любом масштабе, но скорее

контрольная работа первый( Вот ) ваши собственные реализации,в том же масштабе - настроитьMA(200) [10000]float64 - а такжеопубликуйте ваши базовые длительности в чтобы просмотреть начальную производительность процесса и[us]сравнить яблоки с яблоками, разместив порог для сравнения.51.2 [us]Далее идет более сложная часть:

Факт № 1: эта задача НЕ смущающе параллельна

Да, можно пойти и реализовать

 вычисление скользящего среднего, так что оно действительно проходит через кучи данных, используя некоторое намеренно внушаемое «просто» - процесс обработки (независимо от того, из-за какой-то ошибки, некоторые[CONCURRENT]"совет"профессиональная слепота или просто двойственное сократское справедливое невежество), которое, очевидно, не означает, что природа сверточной обработки потока, присутствующая в математической формулировке Скользящее среднее, забыла быть чистой процесс, просто из-за попытки его применения вычисляется в некоторой степени "просто" -[SERIAL] обработка.[CONCURRENT](Кстати. Жесткие компьютерные ученые и ботаники с двумя доменами также будут здесь возражать, что язык Go спроектирован с использованием лучших навыков Роба Пайка, чтобы создать систему параллельных сопрограмм, а не какую-либо

 планирование процессов, даже несмотря на то, что CSP-инструменты Hoare, доступные в языковой концепции, могут добавить немного соли и перца и внедрить средства межпроцессного взаимодействия типа «стоп-блок», которые будут блокировать «просто» -[PARALLEL] фрагменты кода в некоторой аппаратной CSP-p2p-синхронизации. )[CONCURRENT]Факт № 2: распределяться по Go (для любого ускорения) только НА КОНЦЕ

Низкий уровень производительности в

 не устанавливает критерий. Имея разумное количество настроек производительности в однопоточном режиме, только тогда можно получить выгоду от распределения (все же придется платить дополнительные последовательные расходы, что делает закон Амдаля (скорее[SERIAL]Накладные-строгиЗакон Амдаля ) попасть в игру).Если можно ввести такое

низкий уровень дополнительных настроек-накладных расходова также по-прежнему достичь какого-либо замечательного параллелизма,не- часть обработки[SEQ]Там и только появляется шанс повысить эффективность процесса.Не трудно потерять намного больше, чем получить

 в этом, поэтому всегда сравните против потенциальных компромиссов между[SEQ] теоретическое ускорение, за которое нужно заплатить сумму суммы всех дополнительныхnon-[SEQ] / N[PAR]_processesнакладные расходы, так[SEQ]если и только если :Не имея такого преимущества для реактивных истребителей, как избыточная высота и Солнце позади вас, никогда не пытайтесь делать какие-либо попытки HPC / распараллеливания - они никогда не окупятся, не будучи замечательно

(         pure-[SEQ]_processing      [ns]
+       add-on-[SEQ]-setup-overheads [ns]
+        ( non-[SEQ]_processing      [ns] / N[PAR]_processes )
  ) << (  pure-[SEQ]_processing      [ns]
       + ( non-[SEQ]_processing      [ns] / 1 )
         )

 лучше, чем умный<<-процесс.[SEQ]Эпилог: на интерактивном эксперименте UI-строгого закона Амдала

Одна анимация стоит миллион слов.

интерактивный анимация еще лучше:Так,

принять тестируемый процесс, который имеет как
 и[SERIAL] часть графика процесса.[PARALLEL]Позволять

 бытьp доля длительности процесса[PARALLEL] Таким образом~ ( 0.0 .. 1.0 ) часть не длится дольше чем[SERIAL], правильно?( 1 - p )Итак, давайте начнем интерактивные эксперименты с такого теста, где

Это означает, что вся такая продолжительность процесса тратится всего наp == 1.0 часть, и как начальный последовательный, так и завершающие части потока процесса (которые в основном всегда[PARALLEL] ) имеют нулевую продолжительность[SERIAL]Предположим, что система не обладает какой-то особой магией и поэтому должна потратить некоторые реальные шаги на инициализацию каждого из( ( 1 - p ) == 0. )

 часть, чтобы запустить его на другом процессоре[PARALLEL]Итак, давайте добавим некоторые накладные расходы, если будет предложено реорганизовать поток процесса и выполнить маршалирование + распространение + отмену маршалинга всех необходимых инструкций и данных, чтобы намеченный процесс теперь мог запускаться и выполняться( (1), 2, .., N ) Процессоры параллельно.NЭти расходы называются

 (здесь изначально предполагается, что для простоты просто постоянным и инвариантнымo, что не всегда имеет место в реальном времени, на кремнии / на NUMA / на распределенных инфраструктурах).NЕсли щелкнуть заголовок «Эпилог» выше, откроется интерактивная среда, в которой можно бесплатно экспериментировать.

С участием

 производительность круто растет до текущей достижимойp == 1. && o == 0. && N > 1- аппаратные ограничения O / S для все еще монолитного выполнения кода O / S (где по-прежнему нет дополнительных затрат на распространение для MPI и аналогичных распределений рабочих режимов в режиме depeche-режима (где нужно было бы сразу добавить действительно большое количество[PARALLEL]в то время как наши до сих пор лучше всего[ms] реализация, очевидно,[SERIAL]сделал всю работу менее чем за ~ 22,1 [нас] )).Но кроме такого искусственно оптимистичного случая, работа не выглядит настолько дешевой, чтобы ее можно было эффективно распараллелить.

Попробуйте иметь не ноль, а примерно ~ 0,01% накладных расходов на установку

и линия начинает показывать совершенно иную природу масштабируемого с учетом накладных расходов масштабирования даже для крайней степениo дело (имея еще[PARALLEL] ) и иметь потенциальное ускорение где-то около половины первоначального супер-идеалистического случая линейного ускорения.p == 1.0Теперь включите

 к чему-то ближе к реальности, где-то менее искусственно установленный, чем первоначальный супер-идеалистический случайp а также== 1.00 --> { 0.99, 0.98, 0.95 }... Бинго, это реальность, где планирование процессов должно быть проверено и предварительно проверено.Что это обозначает?

Например, если накладные расходы (на запуск + окончательное присоединение к пулу сопрограмм) потребуют больше ~

 фактического0.1% при длительности секции обработки не будет большего ускорения в 4 раза (примерно 1/4 от первоначальной продолжительности во времени) для 5 сопрограмм (с p ~ 0,95), не более чем в 10 раз (в 10 раз быстрее) для 20 сопрограмм (все при условии, что система имеет 5-ядерные ядра, соответственно 20-ядерные процессоры свободны и доступны и готовы (лучше всего с процессами / потоками, сопоставленными с привязкой к CPU-core-уровням уровня O / S) для бесперебойного обслуживания всех этих сопрограмм во время весь их жизненный цикл, чтобы достичь любого ожидаемого ускорения.[PARALLEL]Отсутствие такого количества аппаратных ресурсов, свободных и готовых для всех тех целевых блоков, предназначенных для реализации

-часть графика процесса, состояния блокировки / ожидания будут вводить дополнительные абсолютные состояния ожидания, а результирующая производительность добавляет эти новые[PARALLEL]-блокирование / ожидание разделов общей длительности процесса и первоначально желаемого ускорения внезапно прекращают существовать, и коэффициент производительности значительно падает[SERIAL] (Это означает, что эффективное время выполнения было связано с тем, что состояния блокировки были медленнее, чем непараллельное<< 1.00 рабочий процесс).[SERIAL]Это может показаться сложным для новых увлеченных экспериментаторов, однако мы можем поставить это в обратную сторону. Учитывая весь процесс распространения предполагаемого

 известно, что пул задач не короче, чем, скажем,[PARALLEL]На графиках строгих издержек должно быть не менее10 [us] неблокирующих вычислений интенсивной обработки внутри1000 x 10 [us] раздел, чтобы не опустошать эффективность параллельной обработки.[PARALLEL]Если нет достаточно «жирной» части обработки, накладные расходы (значительно превышающие указанное выше пороговое значение

 ) затем жестоко опустошает чистую эффективность успешно распараллеленной обработки (но выполнив при таких неоправданно высоких относительных затратах на установку против ограниченных чистых эффектов любого~ 0.1%-процессоры, как было продемонстрировано в доступных live-графиках).NДля любителей распределенных вычислений нет ничего удивительного в том, что

 поставляется также с дополнительными зависимостями - наo (чем больше процессов, тем больше усилий нужно потратить на распределение рабочих пакетов), на размеры распределенных BLOB-объектов данных (чем больше BLOB, тем дольше блоки MEM- / IO-устройств остаются заблокированными, прежде чем обслуживать следующий процесс для получить распределенный BLOB через такое устройство / ресурс для каждого целевого объектаN-й процесс приема), по предотвращенным / сигнализированным CSP, опосредованным каналом межпроцессным координированию (назовите это дополнительной блокировкой для каждого инцидента, уменьшая2..N все дальше и дальше ниже, в конечном счете, хорошего идеалаp ).1.Таким образом, реальность реального мира весьма далека от изначально идеализированной, красивой и многообещающей

 а такжеp == 1.0, ( 1 - p ) == 0.0Как очевидно с самого начала, попробуйте побитьo == 0.0

 порог, чем пытаться обойти это, пока все хуже и хуже, если собираешься22.1 [us] [SERIAL] где реалистичные издержки и масштабирование, используя уже недостаточно эффективные подходы, не помогают ни на секунду.[PARALLEL]Это должен быть принятый ответ @battilanast.

 AndreaM1616 окт. 2017 г., 21:58
Почему эти goroutines не масштабируют свою производительность от более параллельных ка

Ваш ответ на вопрос