[erlang-questions] parallelize reduce step

Alex Dashevski alexd555@REDACTED
Tue Jul 3 09:29:25 CEST 2018


Hi,

I want to parallelize reduce step:

-module(phofs).
-export([mapreduce/4]).
-import(lists, [foreach/2]).
%% F1(Pid, X) -> sends {Key,Val} messages to Pid
%% F2(Key, [Val], AccIn) -> AccOut
mapreduce(F1, F2, Acc0, L) ->
S = self(),
Pid= spawn(fun() -> reduce(S, F1, F2, Acc0, L) end),
receive
{Pid, Result} ->Result
end.
reduce(Parent, F1, F2, Acc0, L) ->
process_flag(trap_exit, true),
ReducePid= self(),
foreach(fun(X) ->
spawn_link(fun() -> F1(ReducePid, X) end)
end, L),
N = length(L),
Dict0 = dict:new(),
Dict1 = collect_replies(N, Dict0),
Acc= dict:fold(F2, Acc0, Dict1),
Parent ! {self(), Acc}.
collect_replies(0, Dict) ->
Dict;
collect_replies(N, Dict) ->
receive
{Key, Val} ->
case dict:is_key(Key, Dict) of
true ->Dict1 = dict:append(Key, Val, Dict),
collect_replies(N, Dict1);
false ->Dict1 = dict:store(Key,[Val], Dict),
collect_replies(N, Dict1)
end;
{'EXIT', _, Why} ->
collect_replies(N-1, Dict)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20180703/c8384a57/attachment.htm>


More information about the erlang-questions mailing list