% @author Stephen Marsh % @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com] % @doc plists is a drop-in replacement for module % lists, % making most list operations parallel. It can operate on each element in % parallel, for IO-bound operations, on sublists in parallel, for % taking advantage of multi-core machines with CPU-bound operations, and % across erlang nodes, for parallizing inside a cluster. It handles % errors and node failures. It can be configured, tuned, and tweaked to % get optimal performance while minimizing overhead. % % Almost all the functions are % identical to equivalent functions in lists, returning exactly the same % result, and having both a form with an identical syntax that operates on % each element in parallel and a form which takes an optional "malt", % a specification for how to parallize the operation. % % fold is the one exception, parallel fold is different from linear fold. % This module also include a simple mapreduce implementation, and the % function runmany. All the other functions are implemented with runmany, % which is as a generalization of parallel list operations. % % == Malts == % A malt specifies how to break a list into sublists, and can optionally % specify a timeout, which nodes to run on, and how many processes to start % per node. % % Malt = MaltComponent | [MaltComponent]
% MaltComponent = SubListSize::integer() | {processes, integer()} | % {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}
% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} % % An integer can be given to specify the exact size for % sublists. 1 is a good choice for IO-bound operations and when % the operation on each list element is expensive. Larger numbers % minimize overhead and are faster for cheap operations. % % If the integer is omitted, and % you have specified a {processes, X}, the list is % split into X sublists. This is only % useful when the time to process each element is close to identical and you % know exactly how many lines of execution are available to you. % % If neither of the above applies, the sublist size defaults to 1. % % You can use {processes, X} to have the list processed % by X processes on the local machine. A good choice for X is the number of % lines of execution (cores) the machine provides. % % {timeout, Milliseconds} specifies a timeout. This is a timeout for the entire % operation, both operating on the sublists and combining the results. % exit(timeout) is evaluated if the timeout is exceeded. % % {nodes, NodeList} specifies that the operation should be done across nodes. % Every element of NodeList is of the form {NodeName, NumProcesses} or % NodeName, which means the same as {NodeName, 1}. plists runs % NumProcesses process on NodeName concurrently. A good choice for % NumProcesses is the number of lines of execution (cores) a machine provides % plus one. This ensures the machine is completely busy even when % fetching a new sublist. plists is able to recover if a node goes down. % If all nodes go down, exit(allnodescrashed) is evaluated. % % Any of the above may be used as a malt, or may be combined into a list. % {nodes, NodeList} and {processes, X} may not be combined. % % === Examples === % % start a process for each element (1-element sublists)
% 1 % % % start a process for each ten elements (10-element sublists)
% 10 % % % split the list into two sublists and process in two processes
% {processes, 2} % % % split the list into 10-element sublists and process in two processes
% [10, {processes, 2}] % % % timeout after one second. Assumes that a process should be started
% % for each element.
% {timeout, 1000} % % % Runs 3 processes at a time on apple@desktop, % and 2 on orange@laptop
% % This is the best way to utilize all the CPU-power of a dual-core
% % desktop and a single-core laptop. Assumes that the list should be
% % split into 1-element sublists.
% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]} % % % Gives apple and orange three seconds to process the list as
% % 100-element sublists.
% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}] % % === Aside: Why Malt? === % I needed a word for this concept, so maybe my subconsciousness gave me one by % making me misspell multiply. Maybe it is an acronym for Malt is A List % Tearing Specification. Maybe it is a beer metaphor, suggesting that code % only runs in parallel if bribed with spirits. It's jargon, learn it % or you can't be part of the in-group. % % == Messages and Errors == % plists assures that no extraneous messages are left in or will later % enter the message queue. This is guaranteed even in the event of an error. % % Errors in spawned processes are caught and propagated to the calling % process. If you invoke % % plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]). % % you get a badarith error, exactly like when you use lists:map. % % plists uses monitors to watch the processes it spawns. It is not a good idea % to invoke plists when you are already monitoring processes. If one of them % does a non-normal exit, plists receives the 'DOWN' message believing it to be % from one of its own processes. The error propagation system goes into % effect, which results in the error occuring in the calling process. % % == License == % The MIT License % % Copyright (c) 2007 Stephen Marsh % % Permission is hereby granted, free of charge, to any person obtaining a copy % of this software and associated documentation files (the "Software"), to deal % in the Software without restriction, including without limitation the rights % to use, copy, modify, merge, publish, distribute, sublicense, and/or sell % copies of the Software, and to permit persons to whom the Software is % furnished to do so, subject to the following conditions: % % The above copyright notice and this permission notice shall be included in % all copies or substantial portions of the Software. % % THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR % IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, % FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE % AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER % LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, % OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN % THE SOFTWARE. -module(plists). -export([all/2, all/3, any/2, any/3, filter/2, filter/3, fold/3, fold/4, fold/5, foreach/2, foreach/3, map/2, map/3, partition/2, partition/3, sort/1, sort/2, sort/3, usort/1, usort/2, usort/3, mapreduce/2, mapreduce/3, runmany/3, runmany/4]). % Everything here is defined in terms of runmany. % The following methods are convient interfaces to runmany. % @doc Same semantics as in module % lists. % @spec (Fun, List) -> bool() all(Fun, List) -> all(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> bool() all(Fun, List, Malt) -> try runmany(fun (L) -> B = lists:all(Fun, L), if B -> nil; true -> exit(notall) end end, fun (_A1, _A2) -> nil end, List, Malt) of _ -> true catch exit:notall -> false end. % @doc Same semantics as in module % lists. % @spec (Fun, List) -> bool() any(Fun, List) -> any(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> bool() any(Fun, List, Malt) -> try runmany(fun (L) -> B = lists:any(Fun, L), if B -> exit(any); true -> nil end end, fun (_A1, _A2) -> nil end, List, Malt) of _ -> false catch exit:any -> true end. % @doc Same semantics as in module % lists. % @spec (Fun, List) -> list() filter(Fun, List) -> filter(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> list() filter(Fun, List, Malt) -> runmany(fun (L) -> lists:filter(Fun, L) end, fun (A1, A2) -> A1 ++ A2 end, List, Malt). % Note that with parallel fold there is not foldl and foldr, % instead just one fold that can fuse Accumlators. % @doc Like below, but assumes 1 as the Malt. This function is almost useless, % and is intended only to aid converting code from using lists to plists. % @spec (Fun, InitAcc, List) -> term() fold(Fun, InitAcc, List) -> fold(Fun, Fun, InitAcc, List, 1). % @doc Like below, but uses the Fun as the Fuse by default. % @spec (Fun, InitAcc, List, Malt) -> term() fold(Fun, InitAcc, List, Malt) -> fold(Fun, Fun, InitAcc, List, Malt). % @doc fold is more complex when made parallel. There is no foldl and foldr, % accumulators aren't passed in any defined order. % The list is split into sublists which are folded together. Fun is % identical to the function passed to lists:fold[lr], it takes % (an element, and the accumulator) and returns -> a new accumulator. % It is used for the initial stage of folding sublists. Fuse fuses together % the results, it takes (Results1, Result2) and returns -> a new result. % By default sublists are fused left to right, each result of a fuse being % fed into the first element of the next fuse. The result of the last fuse % is the result. % % Fusing may also run in parallel using a recursive algorithm, % by specifying the fuse as {recursive, Fuse}. See % the discussion in {@link runmany/4}. % % Malt is the malt for the initial folding of sublists, and for the % possible recursive fuse. % @spec (Fun, Fuse, InitAcc, List, Malt) -> term() fold(Fun, Fuse, InitAcc, List, Malt) -> Fun2 = fun (L) -> lists:foldl(Fun, InitAcc, L) end, runmany(Fun2, Fuse, List, Malt). % @doc Same semantics as in module % lists. % @spec (Fun, List) -> void() foreach(Fun, List) -> foreach(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> void() foreach(Fun, List, Malt) -> runmany(fun (L) -> lists:foreach(Fun, L) end, fun (_A1, _A2) -> ok end, List, Malt). % @doc Same semantics as in module % lists. % @spec (Fun, List) -> list() map(Fun, List) -> map(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> list() map(Fun, List, Malt) -> runmany(fun (L) -> lists:map(Fun, L) end, fun (A1, A2) -> A1 ++ A2 end, List, Malt). % @doc Same semantics as in module % lists. % @spec (Fun, List) -> {list(), list()} partition(Fun, List) -> partition(Fun, List, 1). % @doc Same semantics as in module % lists. % @spec (Fun, List, Malt) -> {list(), list()} partition(Fun, List, Malt) -> runmany(fun (L) -> lists:partition(Fun, L) end, fun ({True1, False1}, {True2, False2}) -> {True1 ++ True2, False1 ++ False2} end, List, Malt). % SORTMALT needs to be tuned -define(SORTMALT, 100). % @doc Same semantics as in module % lists. % @spec (List) -> list() sort(List) -> sort(fun (A, B) -> A =< B end, List). % @doc Same semantics as in module % lists. % @spec (Fun, List) -> list() sort(Fun, List) -> sort(Fun, List, ?SORTMALT). % @doc This version lets you specify your own malt for sort. % % sort splits the list into sublists and sorts them, and it merges the % sorted lists together. These are done in parallel. Each sublist is % sorted in a seperate process, and each merging of results is done in a % seperate process. Malt defaults to 100, causing the list to be split into % 100-element sublists. % @spec (Fun, List, Malt) -> list() sort(Fun, List, Malt) -> Fun2 = fun (L) -> lists:sort(Fun, L) end, Fuse = fun (A1, A2) -> lists:merge(Fun, A1, A2) end, runmany(Fun2, {recursive, Fuse}, List, Malt). % @doc Same semantics as in module % lists. % @spec (List) -> list() usort(List) -> usort(fun (A, B) -> A =< B end, List). % @doc Same semantics as in module % lists. % @spec (Fun, List) -> list() usort(Fun, List) -> usort(Fun, List, ?SORTMALT). % @doc This version lets you specify your own malt for usort. % % usort splits the list into sublists and sorts them, and it merges the % sorted lists together. These are done in parallel. Each sublist is % sorted in a seperate process, and each merging of results is done in a % seperate process. Malt defaults to 100, causing the list to be split into % 100-element sublists. % % usort removes duplicate elments while it sorts. % @spec (Fun, List, Malt) -> list() usort(Fun, List, Malt) -> Fun2 = fun (L) -> lists:usort(Fun, L) end, Fuse = fun (A1, A2) -> lists:umerge(Fun, A1, A2) end, runmany(Fun2, {recursive, Fuse}, List, Malt). % @doc Like below, assumes default MapMalt of 1. % @spec (MapFunc, List) -> Dict % MapFunc = (term()) -> DeepListOfKeyValuePairs % DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} mapreduce(MapFunc, List) -> mapreduce(MapFunc, List, 1). % @doc This is a very basic mapreduce. You won't write a Google-rivaling % search engine with it. It has no equivalent in lists. Each % element in the list is run through the MapFunc, which produces either % a {Key, Value} pair, or a lists of key value pairs, or a list of lists of % key value pairs...etc. A reducer process runs in parallel with the mapping % processes, collecting the key value pairs into a % dict. This dict % is returned as the result. % % MapMalt is the malt for the mapping operation, with a default value of 1, % meaning each element of the list is mapped by a seperate process. % % mapreduce requires OTP R11B, or it may leave monitoring messages in the % message queue. % @spec (MapFunc, List, MapMalt) -> Dict % MapFunc = (term()) -> DeepListOfKeyValuePairs % DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} mapreduce(MapFunc, List, MapMalt) -> Parent = self(), {Reducer, ReducerRef} = erlang:spawn_monitor(fun () -> reducer(Parent, dict:new(), 0) end), MapFunc2 = fun (L) -> Reducer ! lists:map(MapFunc, L), 1 end, SentMessages = try runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt) catch exit:Reason -> erlang:demonitor(ReducerRef, [flush]), Reducer ! die, exit(Reason) end, Reducer ! {done, SentMessages}, Results = receive {Reducer, Results2} -> Results2; {'DOWN', _, _, Reducer, Reason2} -> exit(Reason2) end, receive {'DOWN', _, _, Reducer, normal} -> nil end, Results. reducer(Parent, Dict, NumReceived) -> receive die -> nil; {done, NumReceived} -> Parent ! {self (), Dict}; Keys -> reducer(Parent, add_keys(Dict, Keys), NumReceived + 1) end. add_keys(Dict, [{Key, Value}|Keys]) -> case dict:is_key(Key, Dict) of true -> add_keys(dict:append(Key, Value, Dict), Keys); false -> add_keys(dict:store(Key, [Value], Dict), Keys) end; add_keys(Dict, [List|Keys]) when is_list(List) -> add_keys(add_keys(Dict, List), Keys); add_keys(Dict, []) -> Dict. % @doc Like below, but assumes a Malt of 1, % meaning each element of the list is processed by a seperate process. % @spec (Fun, Fuse, List) -> term() runmany(Fun, Fuse, List) -> runmany(Fun, Fuse, List, 1). % Begin internal stuff (though runmany/4 is exported). % @doc All of the other functions are implemented with runmany. runmany % takes a List, splits it into sublists, and starts processes to operate on % each sublist, all done according to Malt. Each process passes its sublist % into Fun and sends the result back. % % The results are then fused together to get the final result. There are two % ways this can operate, lineraly and recursively. If Fuse is a function, % a fuse is done linearly left-to-right on the sublists, the results % of processing the first and second sublists being passed to Fuse, then % the result of that fuse and processing the third sublits, and so on. % This method preserves the original order of the lists elements. % % To do a recursive fuse, pass Fuse as {recursive, FuseFunc}. % The recursive fuse makes no guarantee about the order the results of % sublists, or the results of fuses are passed to FuseFunc. It % continues fusing pairs of results until it is down to one. % % Recursive fuse is down in parallel with processing the sublists, and a % process is spawned to fuse each pair of results. It is a parallized % algorithm. Linear fuse is done after all results of processing sublists % have been collected, and can only run in a single process. % % Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if % the malt contains {nodes, NodeList} or {processes, X}. If this is not the % case, a linear fuse is done. % @spec (Fun, Fuse, List, Malt) -> term() % Fun = (list()) -> term() % Fuse = FuseFunc | {recursive, FuseFunc} % FuseFunc = (term(), term()) -> term() runmany(Fun, Fuse, List, Malt) when is_list(Malt) -> % If the sublist size wasn't specified, assume a default. case lists:any(fun (X) when is_integer(X) -> true; (_) -> false end, Malt) of true -> runmany(Fun, Fuse, List, local, Malt); false -> case lists:foldl(fun({processes, X}, _A) -> X; (_, A) -> A end, no_processes, Malt) of no_processes -> runmany(Fun, Fuse, List, local, [1|Malt]); X -> % split list into X sublists. L = length(List), case L rem X of 0 -> runmany(Fun, Fuse, List, local, [L div X|Malt]); _ -> runmany(Fun, Fuse, List, local, [L div X + 1|Malt]) end end end; runmany(Fun, Fuse, List, Malt) -> runmany(Fun, Fuse, List, [Malt]). runmany(Fun, Fuse, List, Nodes, [MaltTerm|Malt]) when is_integer(MaltTerm) -> runmany(Fun, Fuse, splitmany(List, MaltTerm), Nodes, Malt); % run X process on local machine runmany(Fun, Fuse, List, local, [{processes, X}|Malt]) -> Nodes = lists:duplicate(X, node()), runmany(Fun, Fuse, List, Nodes, Malt); runmany(Fun, Fuse, List, Nodes, [{timeout, X}|Malt]) -> Parent = self(), Timer = spawn(fun () -> receive stoptimer -> Parent ! {timerstopped, self()} after X -> Parent ! {timerrang, self()}, receive stoptimer -> Parent ! {timerstopped, self()} end end end), Ans = try runmany(Fun, Fuse, List, Nodes, Malt) catch % we really just want the after block, the syntax % makes this catch necessary. willneverhappen -> nil after Timer ! stoptimer, cleanup_timer(Timer) end, Ans; runmany(Fun, Fuse, List, local, [{nodes, NodeList}|Malt]) -> Nodes = lists:foldl(fun ({Node, X}, A) -> lists:reverse(lists:duplicate(X, Node), A); (Node, A) -> [Node|A] end, [], NodeList), runmany(Fun, Fuse, List, Nodes, Malt); % local recursive fuse, for when we weren't invoked with {processes, X} % or {nodes, NodeList}. Degenerates recursive fuse into linear fuse. runmany(Fun, {recursive, Fuse}, List, local, []) -> runmany(Fun, Fuse, List, local, []); runmany(Fun, Fuse, List, local, []) -> local_runmany(Fun, Fuse, List); runmany(Fun, Fuse, List, Nodes, []) -> cluster_runmany(Fun, Fuse, List, Nodes). cleanup_timer(Timer) -> receive {timerrang, Timer} -> cleanup_timer(Timer); {timerstopped, Timer} -> nil end. % local runmany, for when we weren't invoked with {processes, X} % or {nodes, NodeList}. Every sublist is processed in parallel. local_runmany(Fun, Fuse, List) -> Parent = self (), Pids = lists:map(fun (L) -> F = fun () -> Parent ! {self (), Fun(L)} end, {Pid, _} = erlang:spawn_monitor(F), Pid end, List), Answers = try lists:map(fun receivefrom/1, Pids) catch throw:Message -> {BadPid, Reason} = Message, handle_error(BadPid, Reason, Pids) end, lists:foreach(fun (Pid) -> normal_cleanup(Pid) end, Pids), fuse(Fuse, Answers). receivefrom(Pid) -> receive {Pid, R} -> R; {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> throw({BadPid, Reason}); {timerrang, _} -> throw({nil, timeout}) end. % Convert List into [{Number, Sublist}] cluster_runmany(Fun, Fuse, List, Nodes) -> {List2, _} = lists:foldl(fun (X, {L, Count}) -> {[{Count, X}|L], Count+1} end, {[], 0}, List), cluster_runmany(Fun, Fuse, List2, Nodes, [], []). % Add a pair of results into the TaskList as a fusing task cluster_runmany(Fun, {recursive, Fuse}, [], Nodes, Running, [{_, R1}, {_, R2}|Results]) -> cluster_runmany(Fun, {recursive, Fuse}, [{fuse, R1, R2}], Nodes, Running, Results); % recursive fuse done, return result cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], [{_, Result}]) -> Result; % edge case where we are asked to do nothing cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], []) -> []; % We're done, now we just have to [linear] fuse the results cluster_runmany(_, Fuse, [], _Nodes, [], Results) -> fuse(Fuse, lists:map(fun ({_, R}) -> R end, lists:sort(fun ({A, _}, {B, _}) -> A =< B end, lists:reverse(Results)))); % We have a ready node and a sublist or fuse to be processed, so we start % a new process cluster_runmany(Fun, Fuse, [Task|TaskList], [N|Nodes], Running, Results) -> Parent = self(), case Task of {Num, L2} -> Fun2 = fun () -> Parent ! {self(), Num, Fun(L2)} end; {fuse, R1, R2} -> {recursive, FuseFunc} = Fuse, Fun2 = fun () -> Parent ! {self(), junknum, FuseFunc(R1, R2)} end end, Pid = spawn(N, Fun2), erlang:monitor(process, Pid), cluster_runmany(Fun, Fuse, TaskList, Nodes, [{Pid, N, Task}|Running], Results); % We can't start a new process, but can watch over already running ones cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Running) > 0 -> receive {Pid, Num, Result} -> % throw out the normal exit message receive {'DOWN', _, _, Pid, normal} -> nil; {'DOWN', _, _, Pid, noproc}-> nil end, {Running2, FinishedNode, _} = delete_running(Pid, Running, []), cluster_runmany(Fun, Fuse, TaskList, [FinishedNode|Nodes], Running2, [{Num, Result}|Results]); {timerrang, _} -> RunningPids = lists:map(fun ({Pid, _, _}) -> Pid end, Running), handle_error(nil, timeout, RunningPids); % node failure {'DOWN', _, _, Pid, noconnection} -> {Running2, _DeadNode, Task} = delete_running(Pid, Running, []), cluster_runmany(Fun, Fuse, [Task|TaskList], Nodes, Running2, Results); {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> RunningPids = lists:map(fun ({Pid, _, _}) -> Pid end, Running), handle_error(BadPid, Reason, RunningPids) end; % We have data, but no nodes either available or occupied cluster_runmany(_, _, [_Non|_Empty], []=_Nodes, []=_Running, _) -> exit(allnodescrashed). delete_running(Pid, [{Pid, Node, List}|Running], Acc) -> {Running ++ Acc, Node, List}; delete_running(Pid, [R|Running], Acc) -> delete_running(Pid, Running, [R|Acc]). handle_error(BadPid, Reason, Pids) -> lists:foreach(fun (Pid) -> exit(Pid, siblingdied) end, Pids), lists:foreach(fun (Pid) -> error_cleanup(Pid, BadPid) end, Pids), exit(Reason). error_cleanup(BadPid, BadPid) -> ok; error_cleanup(Pid, BadPid) -> receive {Pid, _} -> error_cleanup(Pid, BadPid); {Pid, _, _} -> error_cleanup(Pid, BadPid); {'DOWN', _, _, Pid, _Reason} -> ok end. normal_cleanup(Pid) -> receive {'DOWN', _, _, Pid, _Reason} -> ok end. fuse(Fuse, [R1|Results]) -> fuse(Fuse, Results, R1); fuse(_, []) -> []. fuse(Fuse, [R2|Results], R1) -> fuse(Fuse, Results, Fuse(R1, R2)); fuse(_, [], R) -> R. % Splits a list into a list of sublists, each of size Size, % except for the last element which is less if the original list % could not be evenly divided into Size-sized lists. splitmany(List, Size) -> splitmany(List, [], Size). splitmany([], Acc, _) -> lists:reverse(Acc); splitmany(List, Acc, Size) -> {Top, NList} = split(Size, List), splitmany(NList, [Top|Acc], Size). % Like lists:split, except it splits a list smaller than its first % parameter split(Size, List) -> split(Size, List, []). split(0, List, Acc) -> {lists:reverse(Acc), List}; split(Size, [H|List], Acc) -> split(Size - 1, List, [H|Acc]); split(_, [], Acc) -> {lists:reverse(Acc), []}.