%% @copyright 2007 Hynek Vychodil %% @author Hynek Vychodil %% [http://pichis_blog.blogspot.com/] %% @version 0.0.1 %% @end %% ===================================================================== %% @doc Single node map-reduce and fold-reduce for file like data sources %% -module(file_map_reduce). -export([map_reduce/2, map_reduce/3]). -define(BestProc, erlang:system_info(schedulers)). map_reduce(F, MR) -> map_reduce(F, MR, ?BestProc). map_reduce(FileReader, {Map, Reduce} = MapReduce, N) when N>0, is_function(FileReader, 0), is_function(Map, 1), is_function(Reduce, 2) -> M = self(), Worker = fun () -> worker(FileReader, MapReduce, M) end, spawn_workers(Worker, 0, N), wait(Reduce, [], 0, N). spawn_workers(Worker, N, Max) when N spawn_opt(Worker, [link]), spawn_workers(Worker, N + 1, Max); spawn_workers(_, N, N) -> ok. wait(Reduce, Results, N, Procs) when N receive {result, Result}-> wait(Reduce, [Result | Results], N+1, Procs); none -> wait(Reduce, Results, N+1, Procs) after 0 -> Results1 = reduce(Reduce, Results), % root of evil receive {result, Result}-> wait(Reduce, [Result | Results1], N+1, Procs); none -> wait(Reduce, Results1, N+1, Procs) end end; wait(Reduce, Results, N, N) -> reduce_all(Reduce, Results). reduce(Reduce, [R1, R2|T]) -> [Reduce(R1, R2)|reduce(Reduce, T)]; reduce(_,L) -> L. reduce_all(_, []) -> none; reduce_all(_, [R]) -> {result, R}; reduce_all(Reduce, L) -> reduce_all(Reduce, reduce(Reduce, L)). worker(Read, MapReduce, M) -> worker(Read, MapReduce, M, Read(), []). worker(Read, {Map, Reduce} = MapReduce, M, {ok, B}, L) -> worker(Read, MapReduce, M, Read(), reduce(Reduce, [Map(B)|L])); worker(_, {_, Reduce}, M, eof, L) -> M ! reduce_all(Reduce, L).