Это идиоматический пул рабочих потоков в Go?

Я пытаюсь написать простой рабочий пул с программами.

Является ли код, который я написал, идиоматическим? Если нет, то что должно измениться?Я хочу иметь возможность установить максимальное количество рабочих потоков равным 5 и блокировать, пока рабочий не станет доступным, если все 5 заняты. Как бы я расширил это, чтобы иметь только пул максимум 5 рабочих? Я порождаю статические 5 горутин и даю каждомуwork_channel?

код:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

func main() {
    var work_channel = make(chan string)
    var results_channel = make(chan string)

    // create goroutine per item in work_channel
    go func() {
        var c = 0
        var wg sync.WaitGroup
        for work := range work_channel {
            wg.Add(1)
            go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
            c++
        }
        wg.Wait()
        fmt.Println("closing results channel")
        close(results_channel)
    }()

    // add work to the work_channel
    go func() {
        for c := 'a'; c < 'z'; c++ {
            work_channel <- fmt.Sprintf("%c", c)
        }
        close(work_channel)
        fmt.Println("sent work to work_channel")
    }()

    for x := range results_channel {
        fmt.Printf("result: %s\n", x)
    }
}
 icza03 июл. 2016 г., 16:31
Посмотрите мой ответ, который показывает, как создать пул рабочих групп в конце (как альтернативное решение):Взломщик паролей MD5

Ответы на вопрос(2)

Вы можете реализовать счетный семафор, чтобы ограничить параллелизм выполнения.

var tokens = make(chan struct{}, 20)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    tokens <- struct{}{} // acquire a token before performing work
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    <-tokens // release the token
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

Это общая схема, используемая для ограничения количества работников. Конечно, вы можете изменить место выпуска / приобретения токенов в соответствии с вашим кодом.

 Anfernee03 июл. 2016 г., 19:12
Вы должны быть осторожны при упоминании параллелизма.go func(){}() не всегда гарантирует параллельность. Учитывая это, я думаю, что это самый простой способ ограничить количество горутин в полете.
Решение Вопроса

Ваше решение ни в коем случае не является рабочим пулом подпрограмм: ваш код не ограничивает одновременные подпрограммы и не «повторно использует» подпрограммы (он всегда запускает новую при получении нового задания).

Модель производитель-потребитель

Как размещено наВзломщик паролей MD5, вы можете использоватьмодель производитель-потребитель, Вы могли бы иметь назначенныйрежиссер программа, которая будет генерировать задания (что делать / рассчитывать) и отправлять их наработы канал. Вы могли бы иметь фиксированный пулпотребитель программы (например, 5 из них), которые будут проходить по каналу, по которому доставляются задания, и каждая из них будет выполнять / завершать полученные задания.

режиссер Горутин может просто закрытьjobs канал, когда все задания были созданы и отправлены, правильно сигнализируяпотребители что больше не будет рабочих мест.for ... range Конструкция на канале обрабатывает событие «close» и правильно завершается. Обратите внимание, что все задания, отправленные до закрытия канала, все равно будут доставлены.

Это привело бы к чистому дизайну, привело бы к фиксированному (но произвольному) количеству подпрограмм и всегда использовало бы 100% ЦП (если число подпрограмм больше, чем число ядер ЦП). Он также имеет преимущество в том, что его можно «регулировать» при правильном выборе пропускной способности канала (буферизованного канала) и количествапотребитель goroutines.

Обратите внимание, что эта модель иметь назначенный производитель программы не является обязательным. Вы также можете иметь несколько процедур для создания рабочих мест, но тогда вы должны синхронизировать их, чтобы только закрытьjobs канал, когда все продюсерские программы завершены для создания рабочих мест - в противном случае пытается отправить другую работу наjobs канал, когда он уже закрыт, вызывает панику во время выполнения. Обычно создание рабочих мест обходится дешево и может производиться гораздо быстрее, чем их выполнение, поэтому такая модель для производства их за 1 цикл, в то время как многие ее потребляют / выполняют, хороша на практике.

Обработка результатов:

Если работа дает результаты, вы можете выбратьрезультат канал, по которому могут быть доставлены результаты («отосланы назад»), или вы можете выбрать обработку результатов у потребителя, когда задание будет выполнено / завершено. Последнее может быть даже реализовано с помощью функции «обратного вызова», которая обрабатывает результаты. Важным моментом является то, могут ли результаты обрабатываться независимо или их нужно объединять (например, карта-сокращение) или объединять.

Если вы идете сresults канал, вам также нужна процедура, которая получает значения от него, предотвращая блокировку потребителей (произойдет, если буферresults будет заполнен).

Сresults канал

Вместо того, чтобы отправлять простоstring значения как задания и результаты, я бы создал тип-обертку, который может содержать любую дополнительную информацию, и поэтому он гораздо более гибок:

type Job struct {
    Id     int
    Work   string
    Result string
}

Обратите внимание, чтоJob Структура также оборачивает результат, поэтому, когда мы отправляем результат, он также содержит оригиналJob как контекст -часто очень полезно, Также обратите внимание, что выгодно просто отправлять указатели (*Job) на каналах вместоJob значения, поэтому нет необходимости делать «бесчисленные» копииJobс, а также размерJob значение структуры становится неактуальным.

Вот как может выглядеть этот производитель-потребитель:

Я бы использовал 2sync.WaitGroup ценности, их роль будет следовать:

var wg, wg2 sync.WaitGroup

Производитель несет ответственность за создание заданий для выполнения:

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

Когда сделано (больше нет работы),jobs канал закрыт, что сигнализирует потребителям, что больше рабочих мест не будет.

Обратите внимание, чтоproduce() видитjobs канал какотправить толькопотому что это то, что производитель должен делать только с этим:Отправить работы на нем (кромезакрытие это, но это также разрешено наотправить только канал). Случайный прием в производителе будет ошибкой времени компиляции (обнаружен рано, во время компиляции).

Ответственность потребителя состоит в том, чтобы получать рабочие места, пока рабочие места могут быть получены, и выполнять их:

func consume(id int, jobs <-chan *Job, results chan<- *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
        results <- job
    }
}

Обратите внимание, чтоconsume() видитjobs канал какполучать только; потребителю нужно толькоПолучать от него. Точно так жеresults каналотправить только для потребителя.

Также обратите внимание, чтоresults каналне могу закройте здесь, так как есть множество потребительских программ, и только первая попытка закрыть его будет успешной, а дальнейшие вызовут панику во время выполнения!results канал может (должен) быть закрыт после того, как все потребительские программы закончились, потому что тогда мы можем быть уверены, что дальнейшие значения (результаты) не будут отправлены наresults канал.

У нас есть результаты, которые необходимо проанализировать:

func analyze(results <-chan *Job) {
    defer wg2.Done()
    for job := range results {
        fmt.Printf("result: %s\n", job.Result)
    }
}

Как вы можете видеть, это также получает результаты, пока они могут прийти (доresults канал закрыт).results канал для анализатора естьполучать только.

Обратите внимание на использование типов каналов: когда это достаточно, используйте толькооднонаправленный тип канала для раннего обнаружения и предотвращения ошибок во время компиляции. Использовать толькодвунаправленный тип канала, если вам нужны оба направления.

И вот как все они склеены:

func main() {
    jobs := make(chan *Job, 100)    // Buffered channel
    results := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs, results)
    }
    // Start producing
    go produce(jobs)

    // Start analyzing:
    wg2.Add(1)
    go analyze(results)

    wg.Wait() // Wait all consumers to finish processing jobs

    // All jobs are processed, no more values will be sent on results:
    close(results)

    wg2.Wait() // Wait analyzer to analyze all results
}

Пример вывода:

Вот пример вывода:

Как видите, результаты приходят и анализируются до того, как все задания будут поставлены в очередь:

worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms

Попробуйте полное приложение наGo Playground.

Безresults канал

Код значительно упрощается, если мы не используемresults канал, но пользовательские программы обрабатывают результат сразу (напечатайте его в нашем случае). В этом случае нам не нужно 2sync.WaitGroup значения (2-е необходимо было только дождаться завершения анализа).

Безresults Направить полное решение так:

var wg sync.WaitGroup

type Job struct {
    Id   int
    Work string
}

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

func consume(id int, jobs <-chan *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
    }
}

func main() {
    jobs := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs)
    }
    // Start producing
    go produce(jobs)

    wg.Wait() // Wait all consumers to finish processing jobs
}

Выход "как", что сresults канал (но, конечно, порядок выполнения / завершения является случайным).

Попробуйте этот вариант наGo Playground.

 Markus W Mahlberg26 дек. 2018 г., 19:05
Разве это не фанат?
 icza26 дек. 2018 г., 19:07
@MarkusWMahlberg Вы правы, фанат - это когда несколько функций читают канал, пока он не закрыт. Хотя это также не используется в ответе, есть только один потребитель.
 AmaJayJB07 мар. 2018 г., 21:06
Я хотел бы удвоить / тройной / четверной как это!
 Markus W Mahlberg26 дек. 2018 г., 12:03
Разве фактическая терминология Go не называет это конвейером?
 icza08 мар. 2018 г., 11:46
@AmaJayJB Ты прав, хороший улов. Благодарю. Я исправил это, удаливwg.Done() от производителя.
 icza26 дек. 2018 г., 19:05
@MarkusWMahlberg Разветвление используется, когда несколько каналов объединены в один. Это не используется в ответе.
 Scott Frazer03 июл. 2016 г., 21:14
Вау, удивительно! Спасибо!
 Anfernee03 июл. 2016 г., 19:50
Мне нравится этот ответ. Зачем удалять и создавать новые программы, если они все делают одно и то же? Хотя горутины легки и дешевы, мы не должны принимать это как должное. Я считаю, что это хорошая практика.
 AmaJayJB07 мар. 2018 г., 23:44
Во-первых, спасибо, это потрясающе. Я думаю, что у вас может быть один дополнительныйwg.Done() затемwg.Add(1) так как тыwg.Add(1) для каждого потребителя и отложитьwg.Done() для каждого потребителя, но вы такжеwg.Done() для производства. Это означает, чтоwg.Wait() может не ждать, пока все 5 потребителей закончат. Просто проверяю?

Ваш ответ на вопрос