@chi Большое спасибо. Я изменил свой код в соответствии с вашими предложениями. Он работает в 10 раз быстрее, чем предыдущая реализация.
аюсь сравнить Haskell с Go для некоторых примеров параллелизма. Следующий код представляет собой простой пример сокращения карт в Go с использованием распорядителей и каналов. Следующий код Go вычисляет сумму квадратов:
2 ^ 1 + 2 ^ 2 + 3 ^ 2 .... 1024 ^ 2
Чтобы проверить производительность Go и Haskell, я повторяю вычисление суммы квадратов для R раз (10).
package main
import "fmt"
func mapper(in chan int, out chan int) {
for v := range in {out <- v*v}
}
func reducer(in1, in2 chan int, out chan int) {
for i1 := range in1 {i2 := <- in2; out <- i1 + i2}
}
func main() {
const N = 1024 // calculate sum of squares up to N; N must be power of 2
const R = 10 // number of repetitions to fill the "pipe"
var r [N*2]chan int
for i := range r {r[i] = make(chan int)}
var m [N]chan int
for i := range m {m[i] = make(chan int)}
for i := 0; i < N; i++ {go mapper(m[i], r[i + N])}
for i := 1; i < N; i++ {go reducer(r[i * 2], r[i *2 + 1], r[i])}
go func () {
for j := 0; j < R; j++ {
for i := 0; i < N; i++ {m[i] <- i + 1}
}
} ()
for j := 0; j < R; j++ {
<- r[1]
}
}
Вопрос в том, как эффективно реализовать этот пример mapreduce в Haskell. Следующий код на Haskell пытается вычислить 10 ^ 2 + 7 ^ 2 в основной функции. Мой вопрос заключается в том, как создать массив (или список) каналов, например Go, а затем соединить потоки мапперов и редукторов вместе в основной функции.
import Control.Concurrent
data MRchannel = MRchannel !(MVar MRcmd)
data MRcmd = Pass !Int | Add !Int
deriving (Show)
mapper:: MRchannel -> MRchannel -> IO ()
mapper left_C@(MRchannel left) right_C@(MRchannel right) = do
v <- takeMVar left
case v of
Pass x -> do
putMVar right (Add (x*x))
mapper left_C right_C
otherwise -> do
putStrLn "Error!"
return ()
reducer:: MRchannel -> MRchannel -> MRchannel -> IO ()
reducer left_1_C@(MRchannel left_1) left_2_C@(MRchannel left_2)
right_C@(MRchannel right) = do
v1 <- takeMVar left_1
case v1 of
Add x1 -> do
v2 <- takeMVar left_2
case v2 of
Add x2 -> do
putMVar right (Add (x1+x2))
reducer left_1_C left_2_C right_C
otherwise -> do
putStrLn "Error!"
return ()
otherwise -> do
putStrLn "Error!"
return ()
main = do
m1_l <- newEmptyMVar
m2_l <- newEmptyMVar
r1_l1 <- newEmptyMVar
r1_l2 <- newEmptyMVar
r1_r <- newEmptyMVar
let m1_input = MRchannel m1_l
let m2_input = MRchannel m2_l
let r1_input1 = MRchannel r1_l1
let r1_input2 = MRchannel r1_l2
let r1_output = MRchannel r1_r
forkIO $ mapper m1_input r1_input1
forkIO $ mapper m2_input r1_input2
forkIO $ reducer r1_input1 r1_input2 r1_output
putMVar m1_l (Pass 10)
putMVar m2_l (Pass 7)
y <- takeMVar r1_r
case y of
Add kvalue -> do
putStrLn $ show kvalue
otherwise -> do
putStrLn "Error"
return ()