Ein Kanalmultiplexer

Hinweis - Neuling in Go.

Ich habe einen Multiplexer geschriebensollte Füge die Ausgänge eines Arrays von Kanälen zu einem zusammen. Zufrieden mit konstruktiver Kritik.

func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    n := len(channels)
    // The channel to output to.
    ch := make(chan big.Int, n)

    // Make one go per channel.
    for _, c := range channels {
        go func() {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            n -= 1
            // Close output if all closed now.
            if n == 0 {
                close(ch)
            }
        }()
    }
    return ch
}

Ich teste es mit:

func fromTo(f, t int) chan big.Int {
    ch := make(chan big.Int)

    go func() {
        for i := f; i < t; i++ {
            fmt.Println("Feed:", i)
            ch <- *big.NewInt(int64(i))
        }
        close(ch)
    }()
    return ch
}

func testMux() {
    r := make([]chan big.Int, 10)
    for i := 0; i < 10; i++ {
        r[i] = fromTo(i*10, i*10+10)
    }
    all := Mux(r)
    // Roll them out.
    for l := range all {
        fmt.Println(l)
    }
}

aber meine Ausgabe ist sehr seltsam:

Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}

Also zu meinen Fragen:

Gibt es etwas, was ich in Mux falsch mache?Warum bekomme ich nur die letzten 10 von meinem Ausgabekanal?Warum sieht die Fütterung so seltsam aus? (1. jedes Eingangskanals, der gesamte letzte Kanal und dann nichts)Gibt es einen besseren Weg, dies zu tun?

Ich brauche alle Eingangskanäle, um die gleichen Rechte für den Ausgangskanal zu haben - d. H. Ich kann nicht alle Ausgänge von einem Kanal und dann alle vom nächsten usw. haben.

Für alle Interessierten - dies war der endgültige Code nach dem Fix und der korrekten (vermutlich) Verwendung vonsync.WaitGroup

import (
    "math/big"
    "sync"
)

/*
  Multiplex a number of channels into one.
*/
func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    var wg sync.WaitGroup
    wg.Add(len(channels))
    // The channel to output to.
    ch := make(chan big.Int, len(channels))

    // Make one go per channel.
    for _, c := range channels {
        go func(c <-chan big.Int) {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            wg.Done()
        }(c)
    }
    // Close the channel when the pumping is finished.
    go func() {
        // Wait for everyone to be done.
        wg.Wait()
        // Close.
        close(ch)
    }()
    return ch
}

Antworten auf die Frage(3)

Ihre Antwort auf die Frage