@chi Большое спасибо. Я изменил свой код в соответствии с вашими предложениями. Он работает в 10 раз быстрее, чем предыдущая реализация.

аюсь сравнить Haskell с Go для некоторых примеров параллелизма. Следующий код представляет собой простой пример сокращения карт в Go с использованием распорядителей и каналов. Следующий код Go вычисляет сумму квадратов:

2 ^ 1 + 2 ^ 2 + 3 ^ 2 .... 1024 ^ 2

Чтобы проверить производительность Go и Haskell, я повторяю вычисление суммы квадратов для R раз (10).

package main

import "fmt"

func mapper(in chan int, out chan int) {
    for v := range in {out <- v*v}
}

func reducer(in1, in2 chan int, out chan int) {
    for i1 := range in1 {i2 := <- in2; out <- i1 + i2}
}

func main() {
    const N = 1024  // calculate sum of squares up to N; N must be power of 2
    const R = 10  // number of repetitions to fill the "pipe"

    var r [N*2]chan int
    for i := range r {r[i] = make(chan int)}
    var m [N]chan int
    for i := range m {m[i] = make(chan int)}

    for i := 0; i < N; i++ {go mapper(m[i], r[i + N])}
    for i := 1; i < N; i++ {go reducer(r[i * 2], r[i *2 + 1], r[i])}

    go func () {
        for j := 0; j < R; j++ {
            for i := 0; i < N; i++ {m[i] <- i + 1} 
        }
    } ()

    for j := 0; j < R; j++ {
        <- r[1]
    } 
}

Вопрос в том, как эффективно реализовать этот пример mapreduce в Haskell. Следующий код на Haskell пытается вычислить 10 ^ 2 + 7 ^ 2 в основной функции. Мой вопрос заключается в том, как создать массив (или список) каналов, например Go, а затем соединить потоки мапперов и редукторов вместе в основной функции.

import Control.Concurrent
data MRchannel = MRchannel !(MVar MRcmd)
data MRcmd = Pass !Int | Add !Int
  deriving (Show)

mapper:: MRchannel -> MRchannel -> IO ()
mapper left_C@(MRchannel left) right_C@(MRchannel right) = do
    v <- takeMVar left
    case v of
        Pass x -> do
            putMVar right (Add (x*x))
            mapper left_C right_C
        otherwise -> do
            putStrLn "Error!"
            return ()

reducer::  MRchannel -> MRchannel -> MRchannel -> IO ()
reducer left_1_C@(MRchannel left_1) left_2_C@(MRchannel left_2) 
right_C@(MRchannel right) = do
    v1 <- takeMVar left_1
    case v1 of
        Add x1 -> do
            v2 <- takeMVar left_2
            case v2 of
                Add x2 -> do 
                    putMVar right (Add (x1+x2))
                    reducer left_1_C left_2_C right_C
                otherwise -> do
                    putStrLn "Error!"
                    return ()
        otherwise -> do
            putStrLn "Error!"
            return ()

main = do
m1_l <- newEmptyMVar
m2_l <- newEmptyMVar
r1_l1 <- newEmptyMVar
r1_l2 <- newEmptyMVar
r1_r <- newEmptyMVar
    let m1_input = MRchannel m1_l
    let m2_input = MRchannel m2_l
    let r1_input1 = MRchannel r1_l1
    let r1_input2 = MRchannel r1_l2
    let r1_output = MRchannel r1_r
    forkIO $ mapper m1_input r1_input1
    forkIO $ mapper m2_input r1_input2
    forkIO $ reducer r1_input1 r1_input2 r1_output

    putMVar m1_l (Pass 10)
    putMVar m2_l (Pass 7)

    y <- takeMVar r1_r
    case y of 
        Add kvalue  -> do
            putStrLn $ show kvalue
        otherwise -> do
            putStrLn "Error"
            return () 

Ответы на вопрос(3)

Ваш ответ на вопрос