[erlang-questions] gen_cron question

Paul Mineiro paul-trapexit@REDACTED
Mon Mar 3 08:44:43 CET 2008


Hey db,

Each gen_cron instance is a single something that needs to run
periodically and without overlap.  Like gen_server, implementation is
split between a container and a callback module.  You define the callback
module; it's the gen_server interface re-exported with an extra method
called handle_tick (which is the "thing-to-do-periodically").  In the
tests you can see an example of a gen_cron being started up
(http://code.google.com/p/gencron/source/browse/trunk/src/gen_cron.erl#241)
and the callback module lives in the tests/ directory
(http://code.google.com/p/gencron/source/browse/trunk/tests/gen_cron_test.erl).
You pretty much want to define something like gen_cron_test.erl, which
takes an mnesia table name as an arguement to init/1, and which does
mnesia purging stuff in handle_tick/2.

Interestingly purging mnesia tables is the task we made gen_cron for
internally.  In fact we have this thing called gen_expire built lying
around the office that I haven't had a chance to open source yet, which is
probably doing something close to what you want.  I'm attaching the key
files from that, hopefully they are pedagogical.

-- p

On Sun, 2 Mar 2008, db wrote:

> I need to purge 10 mnesia tables on day/hourly interval.  I am looking at
> http://code.google.com/p/gencron/ to accomplish this.  Documentation says
> gen_cron pretty much like gen_server. I am not an expert in gen_server.  I
> want to create 1 process for each table to perform purging.
>
> Do I create 10 process intially and have them sleep on the interval or
> gen_cron starts these 10 process on interval?
>
> Any sample gen_cron example out there?
>
> Gen_cron seems to consider a task as a module.  What if I want to run two
> different modules, do I start two different gen_cron with two different
> name?
>
>
> --
> rk
>
> That which we persist in doing becomes easier for us to do; not that the
> nature of the thing itself is changed, but that our power to do is
> increased.
> -Ralph Waldo Emerson
>

Optimism is an essential ingredient of innovation. How else can the
individual favor change over security?

  -- Robert Noyce
-------------- next part --------------
-module (gen_expire_test).
-export ([ start_link/3, stop/1 ]).
-behaviour (gen_expire).
-export ([ activity_context/2,
           first/2,
           next/3,
           delete/3,
           finish/1,
           init/1,
           handle_call/3,
           handle_cast/2,
           handle_info/2,
           terminate/2,
           code_change/3 ]).

-record (mega, { expect }).

-include ("../src/gen_expire.hrl").

%-=====================================================================-
%-                                Public                               -
%-=====================================================================-

start_link (Interval, Limit, Tab) ->
  gen_expire:start_link (?MODULE, Interval, [ Limit, Tab ], []).

stop (ServerRef) ->
  gen_server:cast (ServerRef, stop).

%-=====================================================================-
%-                         gen_expire callbacks                        -
%-=====================================================================-

activity_context (_Table, State = #mega{ expect = Expect }) 
  when (Expect =:= activity_context) orelse 
       (Expect =:= finish) orelse
       (is_tuple (Expect) andalso element (1, Expect) =:= next) ->
  { sync_dirty, State#mega{ expect = first } }.

first (Table, State = #mega{ expect = first }) ->
  case mnesia:first (Table) of
    '$end_of_table' -> 
      { end_of_table, State#mega{ expect = finish } };
    Key ->
      { ok, Key, State#mega{ expect = { next, Table, Key } } }
  end.

next (Table, Key, State = #mega{ expect = { next, Table, Key } }) ->
  case mnesia:next (Table, Key) of
    '$end_of_table' -> 
      { end_of_table, State#mega{ expect = { delete, Table, Key } } };
    Next ->
      { ok, Next, State#mega{ expect = { delete, Table, Key } } }
  end.

delete (Table, Key, State = #mega{ expect = { delete, Table, Key } }) ->
  Expect = 
    case mnesia:next (Table, Key) of
      '$end_of_table' -> finish;
      Next -> { next, Table, Next }
    end,
  mnesia:delete (Table, Key, write),
  mnesia:delete (list_to_atom (atom_to_list (Table) ++ "_dup"), Key, write),
  { ok, State#mega{ expect = Expect } }.

finish (#mega{ expect = Expect }) when Expect =:= finish orelse 
                                       (is_tuple (Expect) andalso
                                        element (1, Expect) =:= next) ->
  timer:sleep (10).

init ([ Limit, Tab ]) ->
  { ok, 
    [ #expirespec{ table = Tab, max_bytes_per_box = Limit } ],
    #mega{ expect = activity_context } }.

handle_call (_Request, _From, State) -> { noreply, State }.
handle_cast (stop, State) -> { stop, normal, State };
handle_cast (_Request, State) -> { noreply, State }.
handle_info (_Msg, State) -> { noreply, State }.
terminate (_Reason, _State) -> ok.
code_change (_OldVsn, State, _Extra) -> { ok, State }.
-------------- next part --------------
-module (gen_expire).
-export ([ force_run/1,
           start/4,
           start/5,
           start_link/4,
           start_link/5 ]).
%-behaviour (behaviour).
-export ([ behaviour_info/1 ]).
-behaviour (gen_cron).
-export ([ init/1,
           handle_call/3,
           handle_cast/2,
           handle_info/2,
           terminate/2,
           code_change/3,
           handle_tick/2 ]).
%-behaviour (gen_expire).
-export ([ activity_context/2,
           first/2,
           next/3,
           delete/3,
           finish/1 ]).

-include_lib ("flasscheck/include/quickcheck.hrl").
-include_lib ("eunit/include/eunit.hrl").
-include ("gen_expire.hrl").

-define (is_timeout (X), (((X) =:= infinity) orelse 
                          (is_integer (X) andalso (X) > 0))).

-record (genexpire, { module, speclist, state }).

%% @type expirespec() = { expirespec, Table::atom (), MaxBytesPerBox::integer () }.  This is the record type #expirespec{ table, max_bytes_per_box }.

%-=====================================================================-
%-                                Public                               -
%-=====================================================================-

%% @hidden

behaviour_info (callbacks) ->
  [ { activity_context, 2 },
    { first, 2 },
    { next, 3 },
    { delete, 3 },
    { finish, 1 },
    { init, 1 },
    { handle_call, 3 },
    { handle_cast, 2 },
    { handle_info, 2 },
    { terminate, 2 },
    { code_change, 3 } ];
behaviour_info (_Other) ->
  undefined.

%% @spec force_run (ServerRef) -> { ok, Pid } | { underway, Pid }
%% @doc Schedule an immediate expiration.  If the process is already
%% executing then { underway, Pid } is returned.
%% @end

force_run (ServerRef) ->
  gen_cron:force_run (ServerRef).

%% @spec start (Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start/3.  Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end

start (Module, Interval, Args, Options) when ?is_timeout (Interval) ->
  gen_cron:start (?MODULE, Interval, [ Module | Args ], Options).

%% @spec start (ServerName, Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start/4.  Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end

start (ServerName, Module, Interval, Args, Options) when ?is_timeout (Interval) ->
  gen_cron:start (ServerName, ?MODULE, Interval, [ Module | Args ], Options).

%% @spec start_link (Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start_link/3.  Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end

start_link (Module, Interval, Args, Options) when ?is_timeout (Interval) ->
  gen_cron:start_link (?MODULE, Interval, [ Module | Args ], Options).

%% @spec start_link (ServerName, Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start_link/4.  Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end

start_link (ServerName, Module, Interval, Args, Options) when ?is_timeout (Interval) ->
  gen_cron:start_link (ServerName,
                       ?MODULE,
                       Interval,
                       [ Module | Args ],
                       Options).

%-=====================================================================-
%-                         gen_expire callbacks                        -
%-=====================================================================-

%% @spec activity_context (atom (), state ()) -> { async_dirty | sync_dirty | transaction | sync_transaction | ets, NewState::state () }
%% @doc Indicate what activity context to use for expiring table Table.  Note
%% that gen_expire holds a global lock on Table which precludes 
%% other gen_expire instances from expiring the table simultaneously.
%% @end

activity_context (_Table, _State) ->
  erlang:throw (not_implemented).

%% @spec first (atom (), state ()) -> { ok, Key::any (), NewState::state () } | { end_of_table, NewState::state () } 
%% @doc Retrieve the first key from the table (fragment).  "First" here means
%% "in order of desired expiration".
%% @end

first (_Table, _State) ->
  erlang:throw (not_implemented).

%% @spec next (atom (), any (), state ()) -> { ok, NextKey::any (), NewState::state () } | { end_of_table, NewState::state () }
%% @doc Retrieve the next key from the table (fragment).  "Next" here means
%% "in order of desired expiration".
%% @end

next (_Table, _Key, _State) ->
  erlang:throw (not_implemented).

%% @spec delete (atom (), any (), state ()) -> { ok, NewState::state () }
%% @doc Delete the specified key from the table (fragment).
%% @end

delete (_Table, _Key, _State) ->
  erlang:throw (not_implemented).

%% @spec finish (state ()) -> void
%% @doc Called when gen_expire has finished an expiration run.
%% Note that while the state is threaded through
%% a complete expiration run, state changes at the end of the expiration
%% run are discarded because the expiration occurs in seperate process.
%% Therefore this is the opportunity to record any interesting state 
%% changes, e.g., log statistics.
%% @end

finish (_State) ->
  erlang:throw (not_implemented).

%-=====================================================================-
%-                          gen_cron callbacks                         -
%-=====================================================================-

%% @spec init (Args) -> result ()
%%   result () = { ok, Tabs::list (#expirespec{}), State::any () } |
%%               { ok, Tabs::list (#expirespec{}), State::any (), Timeout::integer () } |
%%               { stop, Reason::any () } |
%%               ignore
%% @doc Initialization routine.  Like Module:init/1 for gen_server, except
%% that a list of #expirespec{} is returned to control expiration behavior.
%% @end

init ([ Module | Args ]) ->
  case Module:init (Args) of
    { ok, Speclist, State } ->
      { ok, #genexpire{ module = Module, speclist = Speclist, state = State } };
    { ok, Speclist, State, Timeout } ->
      { ok, 
        #genexpire{ module = Module, speclist = Speclist, state = State },
        Timeout };
    R ->
      R
  end.

%% @spec handle_call (Request, From, State) -> Result
%% @doc Just like the gen_server version.
%% @end

handle_call (Request, From, State) ->
  wrap ((State#genexpire.module):handle_call (Request,
                                              From,
                                              State#genexpire.state),
        State).

%% @spec handle_cast (Request, State) -> Result
%% @doc Just like the gen_server version.
%% @end

handle_cast (Request, State) ->
  wrap ((State#genexpire.module):handle_cast (Request, State#genexpire.state),
        State).

%% @spec handle_info (Msg, State) -> Result
%% @doc Just like the gen_server version.
%% @end

handle_info (Msg, State) ->
  wrap ((State#genexpire.module):handle_info (Msg, State#genexpire.state),
        State).

%% @spec code_change (OldVsn, State, Extra) -> Result
%% @doc Just like the gen_server version.
%% @end

code_change (OldVsn, State, Extra) -> 
  { ok, NewState } = 
    (State#genexpire.module):code_change (OldVsn,
                                          State#genexpire.state,
                                          Extra),
  { ok, State#genexpire{ state = NewState } }.

%% @spec terminate (Result, State) -> Result
%% @doc Just like the gen_server version, except that
%% if a process is running, we wait for it to terminate
%% (prior to calling the module's terminate).
%% @end

terminate (Reason, State) ->
  (State#genexpire.module):terminate (Reason, State#genexpire.state).

%% @hidden

handle_tick (_Reason, State) ->
  FinalState = 
    lists:foldl (fun (#expirespec{ table = Table, 
                                   max_bytes_per_box = MaxBytes },
                      State2) ->
                   LocalFragments = local_fragments (Table),

                   case LocalFragments of
                     [] ->
                       State2;
                     _ ->
                       MaxFragBytes = MaxBytes div length (LocalFragments),
                       lists:foldl (fun (F, State3) ->
                                      expire_frag (F, MaxFragBytes, State3)
                                    end,
                                    State2,
                                    LocalFragments)
                   end
                 end,
                 State,
                 State#genexpire.speclist),
  (FinalState#genexpire.module):finish (FinalState#genexpire.state).

%-=====================================================================-
%-                               Private                               -
%-=====================================================================-

expire_frag (Table, MaxFragBytes, State) ->
  LockId = { { ?MODULE, Table }, self () },

  global:set_lock (LockId),

  try
    { Context, NewState } = 
      (State#genexpire.module):activity_context (Table, State#genexpire.state), 
    mnesia:activity 
      (Context,
       fun () ->
         First = (State#genexpire.module):first (Table, NewState),
         
         case First of
           { end_of_table, NewState2 } -> 
             State#genexpire{ state = NewState2 };
           { ok, _, NewState2 } ->
              expire_frag (Table,
                           MaxFragBytes,
                           State#genexpire{ state = NewState2 },
                           First)
         end
       end)
  after
    global:del_lock (LockId)
  end.

expire_frag (_Table, _MaxFragBytes, State, { end_of_table, _ }) ->
  State;
expire_frag (Table, MaxFragBytes, State, { ok, Key, _ }) ->
  case mnesia:table_info (Table, memory) of
    N when N > MaxFragBytes ->
      Next = (State#genexpire.module):next (Table, Key, State#genexpire.state),
      NewState = case Next of { end_of_table, X } -> X; { ok, _, X } -> X end,

      { ok, NewState2 } = (State#genexpire.module):delete (Table,
                                                           Key,
                                                           NewState),

      expire_frag (Table, 
                   MaxFragBytes, 
                   State#genexpire{ state = NewState2 },
                   Next);
    _ ->
      State
  end.

frag_table_name (Tab, 1) ->
  Tab;
frag_table_name (Tab, N) ->
  list_to_atom (atom_to_list (Tab) ++ "_frag" ++ integer_to_list (N)).

is_local (TableName) ->
  lists:member (node (), mnesia:table_info (TableName, disc_copies)) orelse
  lists:member (node (), mnesia:table_info (TableName, disc_only_copies)) orelse
  lists:member (node (), mnesia:table_info (TableName, ram_copies)).
                       
local_fragments (TableName) ->
  [ F || N <- lists:seq (1, num_fragments (TableName)),
         F <- [ frag_table_name (TableName, N) ],
         is_local (F) ].

num_fragments (Tablename) ->
  { value, { n_fragments, N } } =
    lists:keysearch (n_fragments,
                     1,
                     mnesia:table_info (Tablename, frag_properties)),
  N.

wrap ({ reply, Reply, NewState }, State) ->
  { reply, Reply, State#genexpire{ state = NewState } };
wrap ({ reply, Reply, NewState, Timeout }, State) ->
  { reply, Reply, State#genexpire{ state = NewState }, Timeout };
wrap ({ noreply, NewState }, State) ->
  { noreply, State#genexpire{ state = NewState } };
wrap ({ noreply, NewState, Timeout }, State) ->
  { noreply, State#genexpire{ state = NewState }, Timeout };
wrap ({ stop, Reason, Reply, NewState }, State) ->
  { stop, Reason, Reply, State#genexpire{ state = NewState } };
wrap ({ stop, Reason, NewState }, State) ->
  { stop, Reason, State#genexpire{ state = NewState } }.

-ifdef (EUNIT).

random_atom (Size) ->
  list_to_atom (random_string (Size)).

random_string (Size) ->
  [ $a + random:uniform ($z - $a) - 1 || _ <- lists:seq (1, Size) ].

%-=====================================================================-
%-                                Tests                                -
%-=====================================================================-

expire_test_ () ->
  F = fun () ->
    T = ?FORALL (X,
                 fun (Size) -> 
                  { random_atom (Size), 
                    random:uniform (Size),
                    random:uniform (8) =:= 1,
                    case random:uniform (8) of 
                      1 -> all;
                      2 -> none;
                      _ -> random:uniform (Size)
                    end,
                    [ { N, random_string (Size) } 
                      || N <- lists:seq (1, Size) ] }
                 end,
                 (fun ({ Tab, Frags, Empty, Keep, Terms }) ->
                    TabDup = list_to_atom (atom_to_list (Tab) ++ "_dup"),

                    { atomic, ok } = 
                       mnesia:create_table (Tab, 
                                            [ { frag_properties, [ 
                                                { n_fragments, Frags } ] } ]),

                    { atomic, ok } = 
                       mnesia:create_table (TabDup, 
                                            [ { record_name, Tab },
                                              { frag_properties, [ 
                                                { n_fragments, Frags } ] } ]),

                    InitSize = mnesia:table_info (Tab, memory),

                    if Empty -> 
                         Sizes = [];
                       true ->
                         Sizes = 
                           [ begin
                               mnesia:dirty_write (Tab, { Tab, Key, Value }),
                               mnesia:dirty_write (TabDup, { Tab, Key, Value }),
                               mnesia:table_info (Tab, memory)
                             end ||
                             { Key, Value } <- Terms ]
                    end,

                    { ok, Pid } = 
                       gen_expire_test:start_link 
                         (1000,
                          case { Keep, Empty } of
                            { _, true } -> Frags * InitSize;
                            { all, false } -> Frags * lists:last (Sizes);
                            { none, false } -> Frags * InitSize;
                            { _, false } -> Frags * lists:nth (Keep, Sizes)
                          end,
                          Tab),

                    { ok, ExpPid } = gen_expire:force_run (Pid),
                    { underway, ExpPid } = gen_expire:force_run (Pid),

                    MRef = erlang:monitor (process, ExpPid),

                    receive
                      { 'DOWN', MRef, _, _, _ } -> ok
                    end,

                    ?assert (mnesia:table_info (Tab, memory) =:=
                             case { Keep, Empty } of
                               { _, true } -> InitSize;
                               { all, false } -> lists:last (Sizes);
                               { none, false } -> InitSize;
                               { _, false } -> lists:nth (Keep, Sizes)
                             end),

                    ?assert (mnesia:table_info (TabDup, memory) =:=
                             mnesia:table_info (Tab, memory)),

                    gen_expire_test:stop (Pid),

                    MRef2 = erlang:monitor (process, Pid),

                    receive
                      { 'DOWN', MRef2, _, _, _ } -> ok
                    end,

                    mnesia:delete_table (Tab),
                    mnesia:delete_table (TabDup),

                    true
                  end) (X)),

    ok = fc:flasscheck (200, 10, T)
  end,

  { setup,
    fun () -> os:cmd ("rm -rf Mnesia*"), mnesia:start () end,
    fun (_) -> mnesia:stop (), os:cmd ("rm -rf Mnesia*") end,
    { timeout, 60, F } 
  }.

-endif.
-------------- next part --------------
-ifndef (GENEXPIRE_HRL).
-define (GENEXPIRE_HRL, true).

-record (expirespec, { table, max_bytes_per_box }).

-endif.


More information about the erlang-questions mailing list