Logging to one process from thousands: How does it work?
Sean Hinde
sean.hinde@REDACTED
Fri Jan 6 20:57:43 CET 2006
Hi Chandru,
On 5 Jan 2006, at 19:19, Sean Hinde wrote:
>
> You could introduce an additional accumulator process which stores
> log messages while waiting for a separate disk log owning process
> to write the current chunk. The protocol to the disk log owning
> process could be "send async log message, but don't send any more
> until the disk log process confirms that the write is done with a
> message back".
OK. How about something like what follows at the end of this mail. I
have made it spawn a new process to do the logging rather than have
an additional permanent process and sending a message, but only
because it is simpler for a proof of concept.
The idea is to start this as well as open the disk logs, but route
log writes via this.
It would also be more efficient to have one of these processes per
disk log - then writing out cached log requests could use
disk_log:log_terms/2
It would be better again if the OTP disk log could be persuaded to
send a notification when an async log request has actually been
written - then there would be no need for the spawn.
Sean
%%%-------------------------------------------------------------------
%%% File : nb_log.erl
%%% Author : Sean Hinde <>
%%% Description :
%%%
%%% Created : 6 Jan 2006 by Sean Hinde <>
%%%-------------------------------------------------------------------
-module(nb_log).
-behaviour(gen_server).
%% API
-export([start_link/0, alog/2, balog/2, alog_terms/2, balog_terms/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(s,
{
status,
cache
}).
-define(SERVER, ?MODULE).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
alog(Log, Term) ->
gen_server:call(?SERVER, {log, Log, {log, Term}}).
balog(Log, Term) ->
gen_server:call(?SERVER, {log, Log, {blog, Term}}).
alog_terms(Log, Terms) ->
gen_server:call(?SERVER, {log, Log, {log_terms, Terms}}).
balog_terms(Log, Terms) ->
gen_server:call(?SERVER, {log, Log, {blog_terms, Terms}}).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([]) ->
process_flag(trap_exit, true),
{ok, #s{ status = idle,
cache = queue:new()}}.
handle_call({log, Log, Msg}, _From, S) ->
if
S#s.status == idle ->
spawn_link(fun() ->
catch do_log(Log, Msg),
exit(done)
end),
{reply, ok, S#s{ status = busy }};
S#s.status == busy ->
Q = queue:in({Log, Msg}, S#s.cache),
{reply, ok, S#s{ cache = Q} }
end.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info({'EXIT', _Pid, done}, S) ->
case queue:is_empty(S#s.cache) of
true ->
{noreply, S#s{ status = idle }};
false ->
spawn_link(fun() ->
catch do_log_queue(S#s.cache),
exit(done)
end),
{noreply, S#s{cache = queue:new() }}
end;
handle_info(_Info, S) ->
io:format("Info = ~p~n",[_Info]),
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
do_log(Log, {log, Term}) -> disk_log:log(Log, Term);
do_log(Log, {blog, Term}) -> disk_log:blog(Log, Term);
do_log(Log, {log_terms, Terms}) -> disk_log:log_terms(Log, Terms);
do_log(Log, {blog_terms, Terms}) -> disk_log:blog_terms(Log, Terms).
do_log_queue(Queue) ->
lists:foreach(fun({Log, Entry}) ->
do_log(Log, Entry)
end, queue:to_list(Queue)).
More information about the erlang-questions
mailing list