[erlang-questions] sending data down the wire in mysql vs. mnesia

Paul Mineiro paul-trapexit@REDACTED
Mon Dec 3 19:08:29 CET 2007


That do_select () code is very interesting.

Attached is the kind of thing I'm trying to do right now.  I don't see
anything in the mnesia access API which corresponds: I need to reduce the
number of results prior to sending across the wire.  The mnesia access API
seems to be geared towards emulating a table (which is quite reasonable!).
I suppose I could overload the concept of mnesia:read () so that reading
special keys has the effect of computing an aggregate via map-reduce.  Is
this what you had in mind?

I think I should be able to gussy up the attached code using knowledge
gleaned from understanding do_select () to at least get something like a
single transactional context for this operation (I only really use it with
ets context anyway, but it's fun to think about).

Thanks!

-- p

On Mon, 3 Dec 2007, Ulf Wiger wrote:

> If you look into the mnesia_frag module, you will see how
> mnesia does distributed selec (mnesia_frag:do_select()).
>
> In order to sustain this level of trickery, one must know
> exactly how mnesia transactions work, but my suggestion
> was basically that you make a copy of mnesia_frag and
> extend this code.
>
> BR,
> Ulf W
>
>
> 2007/12/3, Paul Mineiro <paul-trapexit@REDACTED>:
> > The mnesia access API seems to low level to pull this off, since it
> > intercepts individual reads and writes.
> >
> > What I'd like is something like rpc:pmap/3 for mnesia (fragmented tables);
> > in this case I can't hand roll it because the transaction context does not
> > carry over through an rpc:call (and its not clear how the side effect of
> > doing an rpc call interacts with transaction retry mechanism anyway;
> > seems like mnesia should be aware of this explicitly for it to work out).
> >
> > What I can do right now is use the rpc module to call mnesia:activity for
> > me on the remote nodes, and the reduce the collection of results.  This is
> > ok but 1) I have to figure out where the active replicas are for the
> > fragments I care about so location transparency is gone and 2) I can't
> > have a single transaction for all the activity.
> >
> > -- p
> >
> > On Sun, 2 Dec 2007, Ulf Wiger wrote:
> >
> > > 2007/12/1, Paul Mineiro <paul-trapexit@REDACTED>:
> > > > With a fragmented MySQL table, I can execute a stored procedure
> > > > on each MySQL fragment, and the reduce the collection of results in the
> > > > client.  The only thing that travels over the wire is the collection of
> > > > results, one for each fragment.
> > > >
> > > > I'm looking for something similar from Mnesia.  Erlang has all the
> > > > building blocks to put it together (so I have), but I can see an
> > > > opportunity to come up with something standard within Mnesia which is
> > > > reasonably general and would return me to a more location-transparent
> > > > style of programming.
> > >
> > > You can modify mnesia_frag and tell mnesia to use your version instead
> > > of the default. How to do this is documented in mnesia. The downside is
> > > of course that you'd have to duplicate/maintain a whole bunch of other logic
> > > as well, just to make this small change, but in the process of doing this,
> > > you may arrive at some suggestion on how to make this parameterizable.
> > >
> > > BR,
> > > Ulf W
> > >
> >
> > Optimism is an essential ingredient of innovation. How else can the
> > individual favor change over security?
> >
> >   -- Robert Noyce
> > _______________________________________________
> > erlang-questions mailing list
> > erlang-questions@REDACTED
> > http://www.erlang.org/mailman/listinfo/erlang-questions
> >
>

Optimism is an essential ingredient of innovation. How else can the
individual favor change over security?

  -- Robert Noyce
-------------- next part --------------
-module (mapreduce).
%-include_lib ("flasscheck/include/quickcheck.hrl").
%-include_lib ("eunit/include/eunit.hrl").
-export ([ mapreduce/6 ]).

%-=====================================================================-
%-                                Public                               -
%-=====================================================================-

%% @spec mapreduce (context (), atom (), map (), reduce (), Acc, frag_spec ()) -> Acc
%%    context () = ets | async_dirty | sync_dirty | transaction | sync_transaction
%%    map () = fun (Table) -> X
%%    reduce () = fun (X, Acc) -> Acc
%%    frag_spec () = all | [ integer () ]
%% @doc transaction and sync_transaction don't make much sense, since
%% transactions would be acquired on fragment-by-fragment basis.

mapreduce (Context, BaseTable, Map, Reduce, Acc0, FragSpec) 
  when ((Context =:= ets) or (Context =:= async_dirty) or 
        (Context =:= sync_dirty) or (Context =:= transaction) or
        (Context =:= sync_transaction)),
       is_atom (BaseTable),
       is_function (Map, 1),
       is_function (Reduce, 2),
       ((FragSpec =:= all) or is_list (FragSpec)) ->
  % c.f. gen_server:do_multi_call/4

  Ref = make_ref (),
  Caller = self (),
  { Receiver, Mref } =
    erlang:spawn_monitor
      (fun () ->
         %% Middleman process. Should be unsensitive to regular
         %% exit signals. The synchronization is needed in case
         %% the receiver would exit before the caller started
         %% the monitor.
         process_flag (trap_exit, true),
         Mref = erlang:monitor (process, Caller),
         receive
           { Caller, Ref } ->
             Keys = 
               [ rpc:async_call (Node, 
                                 mnesia,
                                 activity,
                                 [ Context, Map, [ F ] ]) 
                 || { F, Node } <- select_fragments (BaseTable, FragSpec) ],

             Result = 
               lists:foldl (Reduce, Acc0, [ rpc:yield (K) || K <- Keys ]),

             exit ({ self(), Ref, Result });
           { 'DOWN', Mref, _, _, _ } ->
             %% Caller died before sending us the go-ahead.
             %% Give up silently.
             exit (normal)
         end
      end),
  Receiver ! { self (), Ref },
  receive
    { 'DOWN', Mref, _, _, { Receiver, Ref, Result } } ->
      { ok, Result };
    { 'DOWN', Mref, _, _, Reason } ->
      %% The middleman code failed. Or someone did
      %% exit(_, kill) on the middleman process => Reason==killed
      { error, Reason }
  end.

%-=====================================================================-
%-                               Private                               -
%-=====================================================================-

fragments (BaseTable, all) ->
  lists:seq (1, num_fragments (BaseTable));
fragments (_, FragSpec) ->
  FragSpec.

frag_table_name (TableName, 1) -> TableName;
frag_table_name (TableName, FragNum) when FragNum > 1 ->
  list_to_atom (atom_to_list (TableName) ++ 
                "_frag" ++ 
                integer_to_list (FragNum)).

is_fragmented (Table) ->
  case mnesia:table_info (Table, frag_properties) of 
    [] -> false;
    _ -> true
  end.

num_fragments (TableName) ->
  FragProps = mnesia:table_info (TableName, frag_properties),
  { value, { n_fragments, NFrags } } = lists:keysearch (n_fragments,
                                                        1,
                                                        FragProps),
  NFrags.

select_fragments (BaseTable, FragSpec) ->
  Fragments = 
    case is_fragmented (BaseTable) of
      false -> 
        [ BaseTable ];
      true ->
        [ frag_table_name (BaseTable, F) 
          || F <- fragments (BaseTable, FragSpec) ]
    end,

  [ { F, select_node (F) } || F <- Fragments ].

select_node (Table) ->
  ActiveReplicas = mnesia:table_info (Table, active_replicas),

  case lists:member (node (), ActiveReplicas) of
    true ->
      node ();
    false ->
      lists:nth (random:uniform (length (ActiveReplicas)), ActiveReplicas)
  end.

-ifdef (EUNIT).

create_table (Name, NodePool, NumFragments) ->
  { atomic, ok } = mnesia:create_table (Name,
                                        [ { frag_properties, [ 
                                             { n_fragments, NumFragments },
                                             { node_pool, NodePool },
                                             { n_ram_copies, 1 } 
                                            ] 
                                          } 
                                        ]).

destroy_table (Name) ->
  mnesia:delete_table (Name).

start_mnesia (Nodes) ->
  stop_mnesia (Nodes),
  lists:foreach (fun (N) -> ok = rpc:call (N, mnesia, start, []) end, Nodes),
  lists:foreach (fun (N) -> { ok, _ } = rpc:call (N, 
                                                  mnesia, 
                                                  change_config, 
                                                  [ extra_db_nodes, Nodes ]) 
                 end,
                 Nodes).

stop_mnesia (Nodes) ->
  lists:foreach (fun (N) -> stopped = rpc:call (N, mnesia, stop, []) end,
                 Nodes),
  os:cmd ("rm -rf Mnesia*").

%-=====================================================================-
%-                                Tests                                -
%-=====================================================================-

local_test_ () ->
  F = fun () ->
    T = ?FORALL (X,
                 fun (_) -> { random:uniform (7),
                              [ { random:uniform (10000), 
                                  random:uniform (10000) }
                                || _ <- lists:seq (1, random:uniform (100)) ] }
                 end,
                 (fun ({ Frags, Data }) ->
                   create_table (flass, [ node () ], Frags),
                   ok = 
                     mnesia:activity 
                       (sync_dirty,
                        fun () ->
                          lists:foreach 
                            (fun ({ K, V }) -> 
                               mnesia:write ({ flass, K, V }) 
                             end,
                             Data)
                        end,
                        [],
                        mnesia_frag),
                   { ok, Sum } = 
                     mapreduce (ets,
                                flass,
                                fun (Tab) ->
                                  mnesia:foldl 
                                    (fun ({ _, _, Y }, Acc) -> Y + Acc end,
                                     0,
                                     Tab)
                                end,
                                fun (Y, Acc) -> Y + Acc end,
                                0,
                                all),

                   OldSchoolSum = 
                     mnesia:activity 
                       (async_dirty,
                        fun () ->
                          mnesia:foldl (fun ({ _, _, Y }, Acc) -> Y + Acc end,
                                        0,
                                        flass)
                        end,
                        [],
                        mnesia_frag),
                    ?assert (Sum =:= OldSchoolSum),
                    destroy_table (flass),
                    true
                  end) (X)),

    ok = fc:flasscheck (1000, 10, T)
  end,

  { setup,
    fun () -> start_mnesia ([ node () ]) end,
    fun (_) -> stop_mnesia ([ node () ]) end,
    F
  }.

-endif.


More information about the erlang-questions mailing list