[erlang-questions] sending data down the wire in mysql vs. mnesia
Paul Mineiro
paul-trapexit@REDACTED
Mon Dec 3 19:08:29 CET 2007
That do_select () code is very interesting.
Attached is the kind of thing I'm trying to do right now. I don't see
anything in the mnesia access API which corresponds: I need to reduce the
number of results prior to sending across the wire. The mnesia access API
seems to be geared towards emulating a table (which is quite reasonable!).
I suppose I could overload the concept of mnesia:read () so that reading
special keys has the effect of computing an aggregate via map-reduce. Is
this what you had in mind?
I think I should be able to gussy up the attached code using knowledge
gleaned from understanding do_select () to at least get something like a
single transactional context for this operation (I only really use it with
ets context anyway, but it's fun to think about).
Thanks!
-- p
On Mon, 3 Dec 2007, Ulf Wiger wrote:
> If you look into the mnesia_frag module, you will see how
> mnesia does distributed selec (mnesia_frag:do_select()).
>
> In order to sustain this level of trickery, one must know
> exactly how mnesia transactions work, but my suggestion
> was basically that you make a copy of mnesia_frag and
> extend this code.
>
> BR,
> Ulf W
>
>
> 2007/12/3, Paul Mineiro <paul-trapexit@REDACTED>:
> > The mnesia access API seems to low level to pull this off, since it
> > intercepts individual reads and writes.
> >
> > What I'd like is something like rpc:pmap/3 for mnesia (fragmented tables);
> > in this case I can't hand roll it because the transaction context does not
> > carry over through an rpc:call (and its not clear how the side effect of
> > doing an rpc call interacts with transaction retry mechanism anyway;
> > seems like mnesia should be aware of this explicitly for it to work out).
> >
> > What I can do right now is use the rpc module to call mnesia:activity for
> > me on the remote nodes, and the reduce the collection of results. This is
> > ok but 1) I have to figure out where the active replicas are for the
> > fragments I care about so location transparency is gone and 2) I can't
> > have a single transaction for all the activity.
> >
> > -- p
> >
> > On Sun, 2 Dec 2007, Ulf Wiger wrote:
> >
> > > 2007/12/1, Paul Mineiro <paul-trapexit@REDACTED>:
> > > > With a fragmented MySQL table, I can execute a stored procedure
> > > > on each MySQL fragment, and the reduce the collection of results in the
> > > > client. The only thing that travels over the wire is the collection of
> > > > results, one for each fragment.
> > > >
> > > > I'm looking for something similar from Mnesia. Erlang has all the
> > > > building blocks to put it together (so I have), but I can see an
> > > > opportunity to come up with something standard within Mnesia which is
> > > > reasonably general and would return me to a more location-transparent
> > > > style of programming.
> > >
> > > You can modify mnesia_frag and tell mnesia to use your version instead
> > > of the default. How to do this is documented in mnesia. The downside is
> > > of course that you'd have to duplicate/maintain a whole bunch of other logic
> > > as well, just to make this small change, but in the process of doing this,
> > > you may arrive at some suggestion on how to make this parameterizable.
> > >
> > > BR,
> > > Ulf W
> > >
> >
> > Optimism is an essential ingredient of innovation. How else can the
> > individual favor change over security?
> >
> > -- Robert Noyce
> > _______________________________________________
> > erlang-questions mailing list
> > erlang-questions@REDACTED
> > http://www.erlang.org/mailman/listinfo/erlang-questions
> >
>
Optimism is an essential ingredient of innovation. How else can the
individual favor change over security?
-- Robert Noyce
-------------- next part --------------
-module (mapreduce).
%-include_lib ("flasscheck/include/quickcheck.hrl").
%-include_lib ("eunit/include/eunit.hrl").
-export ([ mapreduce/6 ]).
%-=====================================================================-
%- Public -
%-=====================================================================-
%% @spec mapreduce (context (), atom (), map (), reduce (), Acc, frag_spec ()) -> Acc
%% context () = ets | async_dirty | sync_dirty | transaction | sync_transaction
%% map () = fun (Table) -> X
%% reduce () = fun (X, Acc) -> Acc
%% frag_spec () = all | [ integer () ]
%% @doc transaction and sync_transaction don't make much sense, since
%% transactions would be acquired on fragment-by-fragment basis.
mapreduce (Context, BaseTable, Map, Reduce, Acc0, FragSpec)
when ((Context =:= ets) or (Context =:= async_dirty) or
(Context =:= sync_dirty) or (Context =:= transaction) or
(Context =:= sync_transaction)),
is_atom (BaseTable),
is_function (Map, 1),
is_function (Reduce, 2),
((FragSpec =:= all) or is_list (FragSpec)) ->
% c.f. gen_server:do_multi_call/4
Ref = make_ref (),
Caller = self (),
{ Receiver, Mref } =
erlang:spawn_monitor
(fun () ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The synchronization is needed in case
%% the receiver would exit before the caller started
%% the monitor.
process_flag (trap_exit, true),
Mref = erlang:monitor (process, Caller),
receive
{ Caller, Ref } ->
Keys =
[ rpc:async_call (Node,
mnesia,
activity,
[ Context, Map, [ F ] ])
|| { F, Node } <- select_fragments (BaseTable, FragSpec) ],
Result =
lists:foldl (Reduce, Acc0, [ rpc:yield (K) || K <- Keys ]),
exit ({ self(), Ref, Result });
{ 'DOWN', Mref, _, _, _ } ->
%% Caller died before sending us the go-ahead.
%% Give up silently.
exit (normal)
end
end),
Receiver ! { self (), Ref },
receive
{ 'DOWN', Mref, _, _, { Receiver, Ref, Result } } ->
{ ok, Result };
{ 'DOWN', Mref, _, _, Reason } ->
%% The middleman code failed. Or someone did
%% exit(_, kill) on the middleman process => Reason==killed
{ error, Reason }
end.
%-=====================================================================-
%- Private -
%-=====================================================================-
fragments (BaseTable, all) ->
lists:seq (1, num_fragments (BaseTable));
fragments (_, FragSpec) ->
FragSpec.
frag_table_name (TableName, 1) -> TableName;
frag_table_name (TableName, FragNum) when FragNum > 1 ->
list_to_atom (atom_to_list (TableName) ++
"_frag" ++
integer_to_list (FragNum)).
is_fragmented (Table) ->
case mnesia:table_info (Table, frag_properties) of
[] -> false;
_ -> true
end.
num_fragments (TableName) ->
FragProps = mnesia:table_info (TableName, frag_properties),
{ value, { n_fragments, NFrags } } = lists:keysearch (n_fragments,
1,
FragProps),
NFrags.
select_fragments (BaseTable, FragSpec) ->
Fragments =
case is_fragmented (BaseTable) of
false ->
[ BaseTable ];
true ->
[ frag_table_name (BaseTable, F)
|| F <- fragments (BaseTable, FragSpec) ]
end,
[ { F, select_node (F) } || F <- Fragments ].
select_node (Table) ->
ActiveReplicas = mnesia:table_info (Table, active_replicas),
case lists:member (node (), ActiveReplicas) of
true ->
node ();
false ->
lists:nth (random:uniform (length (ActiveReplicas)), ActiveReplicas)
end.
-ifdef (EUNIT).
create_table (Name, NodePool, NumFragments) ->
{ atomic, ok } = mnesia:create_table (Name,
[ { frag_properties, [
{ n_fragments, NumFragments },
{ node_pool, NodePool },
{ n_ram_copies, 1 }
]
}
]).
destroy_table (Name) ->
mnesia:delete_table (Name).
start_mnesia (Nodes) ->
stop_mnesia (Nodes),
lists:foreach (fun (N) -> ok = rpc:call (N, mnesia, start, []) end, Nodes),
lists:foreach (fun (N) -> { ok, _ } = rpc:call (N,
mnesia,
change_config,
[ extra_db_nodes, Nodes ])
end,
Nodes).
stop_mnesia (Nodes) ->
lists:foreach (fun (N) -> stopped = rpc:call (N, mnesia, stop, []) end,
Nodes),
os:cmd ("rm -rf Mnesia*").
%-=====================================================================-
%- Tests -
%-=====================================================================-
local_test_ () ->
F = fun () ->
T = ?FORALL (X,
fun (_) -> { random:uniform (7),
[ { random:uniform (10000),
random:uniform (10000) }
|| _ <- lists:seq (1, random:uniform (100)) ] }
end,
(fun ({ Frags, Data }) ->
create_table (flass, [ node () ], Frags),
ok =
mnesia:activity
(sync_dirty,
fun () ->
lists:foreach
(fun ({ K, V }) ->
mnesia:write ({ flass, K, V })
end,
Data)
end,
[],
mnesia_frag),
{ ok, Sum } =
mapreduce (ets,
flass,
fun (Tab) ->
mnesia:foldl
(fun ({ _, _, Y }, Acc) -> Y + Acc end,
0,
Tab)
end,
fun (Y, Acc) -> Y + Acc end,
0,
all),
OldSchoolSum =
mnesia:activity
(async_dirty,
fun () ->
mnesia:foldl (fun ({ _, _, Y }, Acc) -> Y + Acc end,
0,
flass)
end,
[],
mnesia_frag),
?assert (Sum =:= OldSchoolSum),
destroy_table (flass),
true
end) (X)),
ok = fc:flasscheck (1000, 10, T)
end,
{ setup,
fun () -> start_mnesia ([ node () ]) end,
fun (_) -> stop_mnesia ([ node () ]) end,
F
}.
-endif.
More information about the erlang-questions
mailing list