<div dir="rtl"><div dir="ltr">Hi,</div><div dir="ltr"><br></div><div dir="ltr">I want to parallelize reduce step:</div><div dir="ltr"><br></div><div dir="ltr">-module(phofs).<br>-export([mapreduce/4]).<br>-import(lists, [foreach/2]).<br>%% F1(Pid, X) -> sends {Key,Val} messages to Pid<br>%% F2(Key, [Val], AccIn) -> AccOut<br>mapreduce(F1, F2, Acc0, L) -><br>S = self(),<br>Pid= spawn(fun() -> reduce(S, F1, F2, Acc0, L) end),<br>receive<br>{Pid, Result} ->Result<br>end.<br>reduce(Parent, F1, F2, Acc0, L) -><br>process_flag(trap_exit, true),<br>ReducePid= self(),<br>foreach(fun(X) -><br>spawn_link(fun() -> F1(ReducePid, X) end)<br>end, L),<br>N = length(L),<br>Dict0 = dict:new(),<br>Dict1 = collect_replies(N, Dict0),<br>Acc= dict:fold(F2, Acc0, Dict1),<br>Parent ! {self(), Acc}.<br>collect_replies(0, Dict) -><br>Dict;<br>collect_replies(N, Dict) -><br>receive<br>{Key, Val} -><br>case dict:is_key(Key, Dict) of<br>true ->Dict1 = dict:append(Key, Val, Dict),<br>collect_replies(N, Dict1);<br>false ->Dict1 = dict:store(Key,[Val], Dict),<br>collect_replies(N, Dict1)<br>end;<br>{'EXIT', _, Why} -><br>collect_replies(N-1, Dict)<span><span><span><span><br></span></span></span></span></div></div>