[erlang-questions] gen_cron question
Paul Mineiro
paul-trapexit@REDACTED
Mon Mar 3 08:44:43 CET 2008
Hey db,
Each gen_cron instance is a single something that needs to run
periodically and without overlap. Like gen_server, implementation is
split between a container and a callback module. You define the callback
module; it's the gen_server interface re-exported with an extra method
called handle_tick (which is the "thing-to-do-periodically"). In the
tests you can see an example of a gen_cron being started up
(http://code.google.com/p/gencron/source/browse/trunk/src/gen_cron.erl#241)
and the callback module lives in the tests/ directory
(http://code.google.com/p/gencron/source/browse/trunk/tests/gen_cron_test.erl).
You pretty much want to define something like gen_cron_test.erl, which
takes an mnesia table name as an arguement to init/1, and which does
mnesia purging stuff in handle_tick/2.
Interestingly purging mnesia tables is the task we made gen_cron for
internally. In fact we have this thing called gen_expire built lying
around the office that I haven't had a chance to open source yet, which is
probably doing something close to what you want. I'm attaching the key
files from that, hopefully they are pedagogical.
-- p
On Sun, 2 Mar 2008, db wrote:
> I need to purge 10 mnesia tables on day/hourly interval. I am looking at
> http://code.google.com/p/gencron/ to accomplish this. Documentation says
> gen_cron pretty much like gen_server. I am not an expert in gen_server. I
> want to create 1 process for each table to perform purging.
>
> Do I create 10 process intially and have them sleep on the interval or
> gen_cron starts these 10 process on interval?
>
> Any sample gen_cron example out there?
>
> Gen_cron seems to consider a task as a module. What if I want to run two
> different modules, do I start two different gen_cron with two different
> name?
>
>
> --
> rk
>
> That which we persist in doing becomes easier for us to do; not that the
> nature of the thing itself is changed, but that our power to do is
> increased.
> -Ralph Waldo Emerson
>
Optimism is an essential ingredient of innovation. How else can the
individual favor change over security?
-- Robert Noyce
-------------- next part --------------
-module (gen_expire_test).
-export ([ start_link/3, stop/1 ]).
-behaviour (gen_expire).
-export ([ activity_context/2,
first/2,
next/3,
delete/3,
finish/1,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3 ]).
-record (mega, { expect }).
-include ("../src/gen_expire.hrl").
%-=====================================================================-
%- Public -
%-=====================================================================-
start_link (Interval, Limit, Tab) ->
gen_expire:start_link (?MODULE, Interval, [ Limit, Tab ], []).
stop (ServerRef) ->
gen_server:cast (ServerRef, stop).
%-=====================================================================-
%- gen_expire callbacks -
%-=====================================================================-
activity_context (_Table, State = #mega{ expect = Expect })
when (Expect =:= activity_context) orelse
(Expect =:= finish) orelse
(is_tuple (Expect) andalso element (1, Expect) =:= next) ->
{ sync_dirty, State#mega{ expect = first } }.
first (Table, State = #mega{ expect = first }) ->
case mnesia:first (Table) of
'$end_of_table' ->
{ end_of_table, State#mega{ expect = finish } };
Key ->
{ ok, Key, State#mega{ expect = { next, Table, Key } } }
end.
next (Table, Key, State = #mega{ expect = { next, Table, Key } }) ->
case mnesia:next (Table, Key) of
'$end_of_table' ->
{ end_of_table, State#mega{ expect = { delete, Table, Key } } };
Next ->
{ ok, Next, State#mega{ expect = { delete, Table, Key } } }
end.
delete (Table, Key, State = #mega{ expect = { delete, Table, Key } }) ->
Expect =
case mnesia:next (Table, Key) of
'$end_of_table' -> finish;
Next -> { next, Table, Next }
end,
mnesia:delete (Table, Key, write),
mnesia:delete (list_to_atom (atom_to_list (Table) ++ "_dup"), Key, write),
{ ok, State#mega{ expect = Expect } }.
finish (#mega{ expect = Expect }) when Expect =:= finish orelse
(is_tuple (Expect) andalso
element (1, Expect) =:= next) ->
timer:sleep (10).
init ([ Limit, Tab ]) ->
{ ok,
[ #expirespec{ table = Tab, max_bytes_per_box = Limit } ],
#mega{ expect = activity_context } }.
handle_call (_Request, _From, State) -> { noreply, State }.
handle_cast (stop, State) -> { stop, normal, State };
handle_cast (_Request, State) -> { noreply, State }.
handle_info (_Msg, State) -> { noreply, State }.
terminate (_Reason, _State) -> ok.
code_change (_OldVsn, State, _Extra) -> { ok, State }.
-------------- next part --------------
-module (gen_expire).
-export ([ force_run/1,
start/4,
start/5,
start_link/4,
start_link/5 ]).
%-behaviour (behaviour).
-export ([ behaviour_info/1 ]).
-behaviour (gen_cron).
-export ([ init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
handle_tick/2 ]).
%-behaviour (gen_expire).
-export ([ activity_context/2,
first/2,
next/3,
delete/3,
finish/1 ]).
-include_lib ("flasscheck/include/quickcheck.hrl").
-include_lib ("eunit/include/eunit.hrl").
-include ("gen_expire.hrl").
-define (is_timeout (X), (((X) =:= infinity) orelse
(is_integer (X) andalso (X) > 0))).
-record (genexpire, { module, speclist, state }).
%% @type expirespec() = { expirespec, Table::atom (), MaxBytesPerBox::integer () }. This is the record type #expirespec{ table, max_bytes_per_box }.
%-=====================================================================-
%- Public -
%-=====================================================================-
%% @hidden
behaviour_info (callbacks) ->
[ { activity_context, 2 },
{ first, 2 },
{ next, 3 },
{ delete, 3 },
{ finish, 1 },
{ init, 1 },
{ handle_call, 3 },
{ handle_cast, 2 },
{ handle_info, 2 },
{ terminate, 2 },
{ code_change, 3 } ];
behaviour_info (_Other) ->
undefined.
%% @spec force_run (ServerRef) -> { ok, Pid } | { underway, Pid }
%% @doc Schedule an immediate expiration. If the process is already
%% executing then { underway, Pid } is returned.
%% @end
force_run (ServerRef) ->
gen_cron:force_run (ServerRef).
%% @spec start (Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start/3. Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end
start (Module, Interval, Args, Options) when ?is_timeout (Interval) ->
gen_cron:start (?MODULE, Interval, [ Module | Args ], Options).
%% @spec start (ServerName, Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start/4. Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end
start (ServerName, Module, Interval, Args, Options) when ?is_timeout (Interval) ->
gen_cron:start (ServerName, ?MODULE, Interval, [ Module | Args ], Options).
%% @spec start_link (Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start_link/3. Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end
start_link (Module, Interval, Args, Options) when ?is_timeout (Interval) ->
gen_cron:start_link (?MODULE, Interval, [ Module | Args ], Options).
%% @spec start_link (ServerName, Module, Interval::integer (), Args, Options) -> Result
%% @doc The analog to gen_server:start_link/4. Takes an extra argument
%% Interval which is the periodic expiration interval in milliseconds.
%% @end
start_link (ServerName, Module, Interval, Args, Options) when ?is_timeout (Interval) ->
gen_cron:start_link (ServerName,
?MODULE,
Interval,
[ Module | Args ],
Options).
%-=====================================================================-
%- gen_expire callbacks -
%-=====================================================================-
%% @spec activity_context (atom (), state ()) -> { async_dirty | sync_dirty | transaction | sync_transaction | ets, NewState::state () }
%% @doc Indicate what activity context to use for expiring table Table. Note
%% that gen_expire holds a global lock on Table which precludes
%% other gen_expire instances from expiring the table simultaneously.
%% @end
activity_context (_Table, _State) ->
erlang:throw (not_implemented).
%% @spec first (atom (), state ()) -> { ok, Key::any (), NewState::state () } | { end_of_table, NewState::state () }
%% @doc Retrieve the first key from the table (fragment). "First" here means
%% "in order of desired expiration".
%% @end
first (_Table, _State) ->
erlang:throw (not_implemented).
%% @spec next (atom (), any (), state ()) -> { ok, NextKey::any (), NewState::state () } | { end_of_table, NewState::state () }
%% @doc Retrieve the next key from the table (fragment). "Next" here means
%% "in order of desired expiration".
%% @end
next (_Table, _Key, _State) ->
erlang:throw (not_implemented).
%% @spec delete (atom (), any (), state ()) -> { ok, NewState::state () }
%% @doc Delete the specified key from the table (fragment).
%% @end
delete (_Table, _Key, _State) ->
erlang:throw (not_implemented).
%% @spec finish (state ()) -> void
%% @doc Called when gen_expire has finished an expiration run.
%% Note that while the state is threaded through
%% a complete expiration run, state changes at the end of the expiration
%% run are discarded because the expiration occurs in seperate process.
%% Therefore this is the opportunity to record any interesting state
%% changes, e.g., log statistics.
%% @end
finish (_State) ->
erlang:throw (not_implemented).
%-=====================================================================-
%- gen_cron callbacks -
%-=====================================================================-
%% @spec init (Args) -> result ()
%% result () = { ok, Tabs::list (#expirespec{}), State::any () } |
%% { ok, Tabs::list (#expirespec{}), State::any (), Timeout::integer () } |
%% { stop, Reason::any () } |
%% ignore
%% @doc Initialization routine. Like Module:init/1 for gen_server, except
%% that a list of #expirespec{} is returned to control expiration behavior.
%% @end
init ([ Module | Args ]) ->
case Module:init (Args) of
{ ok, Speclist, State } ->
{ ok, #genexpire{ module = Module, speclist = Speclist, state = State } };
{ ok, Speclist, State, Timeout } ->
{ ok,
#genexpire{ module = Module, speclist = Speclist, state = State },
Timeout };
R ->
R
end.
%% @spec handle_call (Request, From, State) -> Result
%% @doc Just like the gen_server version.
%% @end
handle_call (Request, From, State) ->
wrap ((State#genexpire.module):handle_call (Request,
From,
State#genexpire.state),
State).
%% @spec handle_cast (Request, State) -> Result
%% @doc Just like the gen_server version.
%% @end
handle_cast (Request, State) ->
wrap ((State#genexpire.module):handle_cast (Request, State#genexpire.state),
State).
%% @spec handle_info (Msg, State) -> Result
%% @doc Just like the gen_server version.
%% @end
handle_info (Msg, State) ->
wrap ((State#genexpire.module):handle_info (Msg, State#genexpire.state),
State).
%% @spec code_change (OldVsn, State, Extra) -> Result
%% @doc Just like the gen_server version.
%% @end
code_change (OldVsn, State, Extra) ->
{ ok, NewState } =
(State#genexpire.module):code_change (OldVsn,
State#genexpire.state,
Extra),
{ ok, State#genexpire{ state = NewState } }.
%% @spec terminate (Result, State) -> Result
%% @doc Just like the gen_server version, except that
%% if a process is running, we wait for it to terminate
%% (prior to calling the module's terminate).
%% @end
terminate (Reason, State) ->
(State#genexpire.module):terminate (Reason, State#genexpire.state).
%% @hidden
handle_tick (_Reason, State) ->
FinalState =
lists:foldl (fun (#expirespec{ table = Table,
max_bytes_per_box = MaxBytes },
State2) ->
LocalFragments = local_fragments (Table),
case LocalFragments of
[] ->
State2;
_ ->
MaxFragBytes = MaxBytes div length (LocalFragments),
lists:foldl (fun (F, State3) ->
expire_frag (F, MaxFragBytes, State3)
end,
State2,
LocalFragments)
end
end,
State,
State#genexpire.speclist),
(FinalState#genexpire.module):finish (FinalState#genexpire.state).
%-=====================================================================-
%- Private -
%-=====================================================================-
expire_frag (Table, MaxFragBytes, State) ->
LockId = { { ?MODULE, Table }, self () },
global:set_lock (LockId),
try
{ Context, NewState } =
(State#genexpire.module):activity_context (Table, State#genexpire.state),
mnesia:activity
(Context,
fun () ->
First = (State#genexpire.module):first (Table, NewState),
case First of
{ end_of_table, NewState2 } ->
State#genexpire{ state = NewState2 };
{ ok, _, NewState2 } ->
expire_frag (Table,
MaxFragBytes,
State#genexpire{ state = NewState2 },
First)
end
end)
after
global:del_lock (LockId)
end.
expire_frag (_Table, _MaxFragBytes, State, { end_of_table, _ }) ->
State;
expire_frag (Table, MaxFragBytes, State, { ok, Key, _ }) ->
case mnesia:table_info (Table, memory) of
N when N > MaxFragBytes ->
Next = (State#genexpire.module):next (Table, Key, State#genexpire.state),
NewState = case Next of { end_of_table, X } -> X; { ok, _, X } -> X end,
{ ok, NewState2 } = (State#genexpire.module):delete (Table,
Key,
NewState),
expire_frag (Table,
MaxFragBytes,
State#genexpire{ state = NewState2 },
Next);
_ ->
State
end.
frag_table_name (Tab, 1) ->
Tab;
frag_table_name (Tab, N) ->
list_to_atom (atom_to_list (Tab) ++ "_frag" ++ integer_to_list (N)).
is_local (TableName) ->
lists:member (node (), mnesia:table_info (TableName, disc_copies)) orelse
lists:member (node (), mnesia:table_info (TableName, disc_only_copies)) orelse
lists:member (node (), mnesia:table_info (TableName, ram_copies)).
local_fragments (TableName) ->
[ F || N <- lists:seq (1, num_fragments (TableName)),
F <- [ frag_table_name (TableName, N) ],
is_local (F) ].
num_fragments (Tablename) ->
{ value, { n_fragments, N } } =
lists:keysearch (n_fragments,
1,
mnesia:table_info (Tablename, frag_properties)),
N.
wrap ({ reply, Reply, NewState }, State) ->
{ reply, Reply, State#genexpire{ state = NewState } };
wrap ({ reply, Reply, NewState, Timeout }, State) ->
{ reply, Reply, State#genexpire{ state = NewState }, Timeout };
wrap ({ noreply, NewState }, State) ->
{ noreply, State#genexpire{ state = NewState } };
wrap ({ noreply, NewState, Timeout }, State) ->
{ noreply, State#genexpire{ state = NewState }, Timeout };
wrap ({ stop, Reason, Reply, NewState }, State) ->
{ stop, Reason, Reply, State#genexpire{ state = NewState } };
wrap ({ stop, Reason, NewState }, State) ->
{ stop, Reason, State#genexpire{ state = NewState } }.
-ifdef (EUNIT).
random_atom (Size) ->
list_to_atom (random_string (Size)).
random_string (Size) ->
[ $a + random:uniform ($z - $a) - 1 || _ <- lists:seq (1, Size) ].
%-=====================================================================-
%- Tests -
%-=====================================================================-
expire_test_ () ->
F = fun () ->
T = ?FORALL (X,
fun (Size) ->
{ random_atom (Size),
random:uniform (Size),
random:uniform (8) =:= 1,
case random:uniform (8) of
1 -> all;
2 -> none;
_ -> random:uniform (Size)
end,
[ { N, random_string (Size) }
|| N <- lists:seq (1, Size) ] }
end,
(fun ({ Tab, Frags, Empty, Keep, Terms }) ->
TabDup = list_to_atom (atom_to_list (Tab) ++ "_dup"),
{ atomic, ok } =
mnesia:create_table (Tab,
[ { frag_properties, [
{ n_fragments, Frags } ] } ]),
{ atomic, ok } =
mnesia:create_table (TabDup,
[ { record_name, Tab },
{ frag_properties, [
{ n_fragments, Frags } ] } ]),
InitSize = mnesia:table_info (Tab, memory),
if Empty ->
Sizes = [];
true ->
Sizes =
[ begin
mnesia:dirty_write (Tab, { Tab, Key, Value }),
mnesia:dirty_write (TabDup, { Tab, Key, Value }),
mnesia:table_info (Tab, memory)
end ||
{ Key, Value } <- Terms ]
end,
{ ok, Pid } =
gen_expire_test:start_link
(1000,
case { Keep, Empty } of
{ _, true } -> Frags * InitSize;
{ all, false } -> Frags * lists:last (Sizes);
{ none, false } -> Frags * InitSize;
{ _, false } -> Frags * lists:nth (Keep, Sizes)
end,
Tab),
{ ok, ExpPid } = gen_expire:force_run (Pid),
{ underway, ExpPid } = gen_expire:force_run (Pid),
MRef = erlang:monitor (process, ExpPid),
receive
{ 'DOWN', MRef, _, _, _ } -> ok
end,
?assert (mnesia:table_info (Tab, memory) =:=
case { Keep, Empty } of
{ _, true } -> InitSize;
{ all, false } -> lists:last (Sizes);
{ none, false } -> InitSize;
{ _, false } -> lists:nth (Keep, Sizes)
end),
?assert (mnesia:table_info (TabDup, memory) =:=
mnesia:table_info (Tab, memory)),
gen_expire_test:stop (Pid),
MRef2 = erlang:monitor (process, Pid),
receive
{ 'DOWN', MRef2, _, _, _ } -> ok
end,
mnesia:delete_table (Tab),
mnesia:delete_table (TabDup),
true
end) (X)),
ok = fc:flasscheck (200, 10, T)
end,
{ setup,
fun () -> os:cmd ("rm -rf Mnesia*"), mnesia:start () end,
fun (_) -> mnesia:stop (), os:cmd ("rm -rf Mnesia*") end,
{ timeout, 60, F }
}.
-endif.
-------------- next part --------------
-ifndef (GENEXPIRE_HRL).
-define (GENEXPIRE_HRL, true).
-record (expirespec, { table, max_bytes_per_box }).
-endif.
More information about the erlang-questions
mailing list