Logging to one process from thousands: How does it work?

Sean Hinde sean.hinde@REDACTED
Fri Jan 6 20:57:43 CET 2006


Hi Chandru,

On 5 Jan 2006, at 19:19, Sean Hinde wrote:

>
> You could introduce an additional accumulator process which stores  
> log messages while waiting  for a separate disk log owning process  
> to write the current chunk. The protocol to the disk log owning  
> process could be "send async log message, but don't send any more  
> until the disk log process confirms that the write is done with a  
> message back".

OK. How about something like what follows at the end of this mail. I  
have made it spawn a new process to do the logging rather than have  
an additional permanent process and sending a message, but only  
because it is simpler for a proof of concept.

The idea is to start this as well as open the disk logs, but route  
log writes via this.

It would also be more efficient to have one of these processes per  
disk log - then writing out cached log requests could use  
disk_log:log_terms/2

It would be better again if the OTP disk log could be persuaded to  
send a notification when an async log request has actually been  
written - then there would be no need for the spawn.

Sean

%%%-------------------------------------------------------------------
%%% File    : nb_log.erl
%%% Author  : Sean Hinde <>
%%% Description :
%%%
%%% Created :  6 Jan 2006 by Sean Hinde <>
%%%-------------------------------------------------------------------
-module(nb_log).

-behaviour(gen_server).

%% API
-export([start_link/0, alog/2, balog/2, alog_terms/2, balog_terms/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).

-record(s,
         {
           status,
           cache
          }).

-define(SERVER, ?MODULE).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).


alog(Log, Term) ->
     gen_server:call(?SERVER, {log, Log, {log, Term}}).

balog(Log, Term) ->
     gen_server:call(?SERVER, {log, Log, {blog, Term}}).

alog_terms(Log, Terms) ->
     gen_server:call(?SERVER, {log, Log, {log_terms, Terms}}).

balog_terms(Log, Terms) ->
     gen_server:call(?SERVER, {log, Log, {blog_terms, Terms}}).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
     process_flag(trap_exit, true),
     {ok, #s{ status = idle,
              cache = queue:new()}}.

handle_call({log, Log, Msg}, _From, S) ->
     if
         S#s.status == idle ->
             spawn_link(fun() ->
                                catch do_log(Log, Msg),
                                exit(done)
                        end),
             {reply, ok, S#s{ status = busy }};
        S#s.status == busy ->
             Q = queue:in({Log, Msg}, S#s.cache),
             {reply, ok, S#s{ cache = Q} }
     end.

handle_cast(_Msg, S) ->
     {noreply, S}.

handle_info({'EXIT', _Pid, done}, S) ->
     case queue:is_empty(S#s.cache) of
         true ->
             {noreply, S#s{ status = idle }};
         false ->
             spawn_link(fun() ->
                                catch do_log_queue(S#s.cache),
                                exit(done)
                        end),
             {noreply, S#s{cache = queue:new() }}
     end;
handle_info(_Info, S) ->
     io:format("Info = ~p~n",[_Info]),
     {noreply, S}.

terminate(_Reason, _S) ->
     ok.

code_change(_OldVsn, S, _Extra) ->
     {ok, S}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
do_log(Log, {log, Term})         -> disk_log:log(Log, Term);
do_log(Log, {blog, Term})        -> disk_log:blog(Log, Term);
do_log(Log, {log_terms, Terms})  -> disk_log:log_terms(Log, Terms);
do_log(Log, {blog_terms, Terms}) -> disk_log:blog_terms(Log, Terms).

do_log_queue(Queue) ->
     lists:foreach(fun({Log, Entry}) ->
                           do_log(Log, Entry)
                   end, queue:to_list(Queue)).





More information about the erlang-questions mailing list