Cómo crear listas (o matrices) de canales en Haskell, como Go

Estoy tratando de comparar Haskell con Go para algunos ejemplos de concurrencia, el siguiente código es un ejemplo simple de reducción de mapas en Go usando Goroutines y canales. El siguiente código Go calcula la suma de cuadrados:

1 ^ 2 + 2 ^ 2 + 3 ^ 2 .... 1024 ^ 2

Para probar el rendimiento de Go y Haskell, repito calcular la suma de cuadrados para R veces (10).

package main

import "fmt"

func mapper(in chan int, out chan int) {
    for v := range in {out <- v*v}
}

func reducer(in1, in2 chan int, out chan int) {
    for i1 := range in1 {i2 := <- in2; out <- i1 + i2}
}

func main() {
    const N = 1024  // calculate sum of squares up to N; N must be power of 2
    const R = 10  // number of repetitions to fill the "pipe"

    var r [N*2]chan int
    for i := range r {r[i] = make(chan int)}
    var m [N]chan int
    for i := range m {m[i] = make(chan int)}

    for i := 0; i < N; i++ {go mapper(m[i], r[i + N])}
    for i := 1; i < N; i++ {go reducer(r[i * 2], r[i *2 + 1], r[i])}

    go func () {
        for j := 0; j < R; j++ {
            for i := 0; i < N; i++ {m[i] <- i + 1} 
        }
    } ()

    for j := 0; j < R; j++ {
        <- r[1]
    } 
}

La pregunta es cómo implementar este ejemplo de mapreduce en Haskell de manera eficiente. El siguiente código de Haskell intenta calcular 10 ^ 2 + 7 ^ 2 en la función principal. Mi pregunta es cómo crear una matriz (o una lista) de canales, como Go, luego conectar los hilos del mapeador y el reductor todos juntos en la función principal.

import Control.Concurrent
data MRchannel = MRchannel !(MVar MRcmd)
data MRcmd = Pass !Int | Add !Int
  deriving (Show)

mapper:: MRchannel -> MRchannel -> IO ()
mapper left_C@(MRchannel left) right_C@(MRchannel right) = do
    v <- takeMVar left
    case v of
        Pass x -> do
            putMVar right (Add (x*x))
            mapper left_C right_C
        otherwise -> do
            putStrLn "Error!"
            return ()

reducer::  MRchannel -> MRchannel -> MRchannel -> IO ()
reducer left_1_C@(MRchannel left_1) left_2_C@(MRchannel left_2) 
right_C@(MRchannel right) = do
    v1 <- takeMVar left_1
    case v1 of
        Add x1 -> do
            v2 <- takeMVar left_2
            case v2 of
                Add x2 -> do 
                    putMVar right (Add (x1+x2))
                    reducer left_1_C left_2_C right_C
                otherwise -> do
                    putStrLn "Error!"
                    return ()
        otherwise -> do
            putStrLn "Error!"
            return ()

main = do
m1_l <- newEmptyMVar
m2_l <- newEmptyMVar
r1_l1 <- newEmptyMVar
r1_l2 <- newEmptyMVar
r1_r <- newEmptyMVar
    let m1_input = MRchannel m1_l
    let m2_input = MRchannel m2_l
    let r1_input1 = MRchannel r1_l1
    let r1_input2 = MRchannel r1_l2
    let r1_output = MRchannel r1_r
    forkIO $ mapper m1_input r1_input1
    forkIO $ mapper m2_input r1_input2
    forkIO $ reducer r1_input1 r1_input2 r1_output

    putMVar m1_l (Pass 10)
    putMVar m2_l (Pass 7)

    y <- takeMVar r1_r
    case y of 
        Add kvalue  -> do
            putStrLn $ show kvalue
        otherwise -> do
            putStrLn "Error"
            return () 

Respuestas a la pregunta(3)

Su respuesta a la pregunta