[erlang-bugs] Memory leak in diameter_service module in diameter app (otp_R16B)

Aleksander Nycz Aleksander.Nycz@REDACTED
Mon May 20 11:45:16 CEST 2013


Hello,

I think there is a problem with resource leak (memory) in 
diameter_service module.

This module is a gen_server, that state contains field watchdogT :: 
ets:tid().
This ets contains info about watchdogs.

Diameter app service cfg is:

[{'Origin-Host',  HostName},
      {'Origin-Realm', Realm},
         {'Vendor-Id',     ...},
      {'Product-Name', ...},
      {'Auth-Application-Id', [?DCCA_APP_ID]},
      {'Supported-Vendor-Id', [...]},
      {application,     [{alias,       diameterNode},
                        {dictionary, dictionaryDCCA},
                      {module,       dccaCallback}]},
      {*restrict_connections, false*}]

After start dimeter app, adding service and transport, diameter_service 
state is:

 > diameter_service:state(diameterNode).
#state{id = {1369,41606,329900},
        service_name = diameterNode,
        service = #diameter_service{pid = <0.1011.0>,
                                    capabilities = #diameter_caps{...},
                                    applications = [#diameter_app{...}]},
        watchdogT = 4194395,peerT = 4259932,shared_peers = 4325469,
        local_peers = 4391006,monitor = false,
        options = [{sequence,{0,32}},
                   {share_peers,false},
                   {use_shared_peers,false},
                   {restrict_connections,false}]}

and ets 4194395 has one record:

 > ets:tab2list(4194395).
[#watchdog{pid = <0.1013.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
            state = initial,
            started = {1369,41606,330086},
            peer = false}]

   Next I run very simple test using seagull symulator. Test scenario is 
following:

1. seagull: send CER
2. seagull: recv CEA
3. seagull: send CCR (init)
4. seagull: recv CCA (init)
5. seagull: send CCR (update)
6. seagull: recv CCR (update)
7. seagull: send CCR (terminate)
8. seagull: recv CCA (terminate)

Durring test there are two watchdogs in ets:

 > ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = initial*,
            started = {1369,41823,711370},
            peer = false},
  #watchdog{pid = <0.1013.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = okay*,
            started = {1369,41606,330086},
            peer = <0.1014.0>}]

After test but before tw timer elapsed, there is two watchdogs also and 
this is ok:

 > ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = initial*,
            started = {1369,41823,711370},
            peer = false},
  #watchdog{pid = *<0.1013.0>*,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = down*,
            started = {1369,41606,330086},
            peer = *<0.1014.0>*}]

But when tm timer elapsed transport and watchdog processes are finished:

 > erlang:is_process_alive(list_to_pid("*<0.1014.0>*")).
false
 > erlang:is_process_alive(list_to_pid("*<0.1013.0>*")).
false

and still two watchdogs are in ets:

 > ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
            state = initial,
            started = {1369,41823,711370},
            peer = false},
  #watchdog{pid = <0.1013.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
            state = down,
            started = {1369,41606,330086},
            peer = <0.1014.0>}]

I think watchdog *<0.1013.0> *should be removed when watchdog process is 
being finished.

I run next test and now there are 3 watchdogs in ets:

 > ets:tab2list(4194395).
[#watchdog{pid = *<0.1816.0>*,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = down*,
            started = {1369,41823,711370},
            peer = *<0.1817.0>*},
  #watchdog{pid = <0.1013.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = down*,
            started = {1369,41606,330086},
            peer = <0.1014.0>},
  #watchdog{pid = <0.3533.0>,type = accept,
            ref = #Ref<0.0.0.1696>,
            options = [{transport_module,diameter_tcp},
                       {transport_config,[{reuseaddr,true},
                                          {ip,{0,0,0,0}},
                                          {port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
                       {watchdog_timer,30000},
                       {reconnect_timer,60000}],
*state = initial*,
            started = {1369,42342,845898},
            peer = false}]

Watchdog and transport process are not alive:

 > erlang:is_process_alive(list_to_pid("<0.1816.0>")).
false
 > erlang:is_process_alive(list_to_pid("<0.1817.0>")).
false


I suggest following change in code to correct this problem (file 
diameter_service.erl):

$ diff diameter_service.erl diameter_service.erl_ok
1006c1006
< connection_down(#watchdog{state = WS,
---
 > connection_down(#watchdog{state = ?WD_OKAY,
1015,1017c1015,1021
<     ?WD_OKAY == WS
<         andalso
<         connection_down(Wd, fetch(PeerT, TPid), S).
---
 >     connection_down(Wd, fetch(PeerT, TPid), S);
 >
 > connection_down(#watchdog{},
 >                 To,
 >                 #state{})
 >   when is_atom(To) ->
 >     ok.

You can find this solution in attachement.

Regards
Aleksander Nycz


-- 
Aleksander Nycz
Senior Software Engineer
Telco_021 BSS R&D
Comarch SA
Phone:  +48 12 646 1216
Mobile: +48 691 464 275
website: www.comarch.pl

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-bugs/attachments/20130520/6819412f/attachment.htm>
-------------- next part --------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
%% Implements the process that represents a service.
%%

-module(diameter_service).
-behaviour(gen_server).

%% towards diameter_service_sup
-export([start_link/1]).

%% towards diameter
-export([subscribe/1,
         unsubscribe/1,
         services/0,
         info/2]).

%% towards diameter_config
-export([start/1,
         stop/1,
         start_transport/2,
         stop_transport/2]).

%% towards diameter_peer
-export([notify/2]).

%% towards diameter_traffic
-export([find_incoming_app/4,
         pick_peer/3]).

%% test/debug
-export([services/1,
         subscriptions/1,
         subscriptions/0,
         call_module/3,
         whois/1,
         state/1,
         uptime/1]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").

%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY,    okay).
-define(WD_SUSPECT, suspect).
-define(WD_DOWN,    down).
-define(WD_REOPEN,  reopen).

-type wd_state() :: ?WD_INITIAL
                  | ?WD_OKAY
                  | ?WD_SUSPECT
                  | ?WD_DOWN
                  | ?WD_REOPEN.

-define(DEFAULT_TC,     30000).  %% RFC 3588 ch 2.1
-define(RESTART_TC,      1000).  %% if restart was this recent

%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
%% service process. In particular, rely on the fact that operations on
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).

%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
-define(STATE_TABLE, ?MODULE).

%% The default sequence mask.
-define(NOMASK, {0,32}).

%% The default restrict_connections.
-define(RESTRICT, nodes).

%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
   :: T | '_' | '$1' | '$2'.

%% State of service gen_server. Note that the state term itself
%% doesn't change, which is relevant for the stateless application
%% callbacks since the state is retrieved from ?STATE_TABLE from
%% outside the service process. The pid in the service record is used
%% to determine whether or not we need to call the process for a
%% pick_peer callback in the statefull case.
-record(state,
        {id = now(),
         service_name :: diameter:service_name(), %% key in ?STATE_TABLE
         service :: #diameter_service{},
         watchdogT = ets_new(watchdogs) %% #watchdog{} at start
                  :: ets:tid(),
         peerT = ets_new(peers)         %% #peer{pid = TPid} at okay/reopen
              :: ets:tid(),
         shared_peers = ?Dict:new()          %% Alias -> [{TPid, Caps}, ...]
                     :: ets:tid(),
         local_peers  = ?Dict:new()          %% Alias -> [{TPid, Caps}, ...]
                     :: ets:tid(),
         monitor = false :: false | pid(),   %% process to die with
         options
         :: [{sequence, diameter:sequence()}  %% sequence mask
             | {restrict_connections, diameter:restriction()}
             | {share_peers, boolean()}  %% broadcast peers to remote nodes?
             | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
%% shared_peers reflects the peers broadcast from remote nodes.

%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
-record(watchdog,
        {pid  :: match(pid()),
         type :: match(connect | accept),
         ref  :: match(reference()),  %% key into diameter_config
         options :: match([diameter:transport_opt()]),%% from start_transport
         state = ?WD_INITIAL :: match(wd_state()),
         started = now(),      %% at process start
         peer = false :: match(boolean() | pid())}).
                      %% true at accepted, pid() at okay/reopen

%% Record representing an Peer State Machine processes implemented by
%% diameter_peer_fsm.
-record(peer,
        {pid   :: pid(),
         apps  :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
         caps  :: #diameter_caps{},
         started = now(),  %% at process start
         watchdog :: pid()}). %% key into watchdogT

%% ---------------------------------------------------------------------------
%% # start/1
%% ---------------------------------------------------------------------------

start(SvcName) ->
    diameter_service_sup:start_child(SvcName).

start_link(SvcName) ->
    Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
    gen_server:start_link(?MODULE, [SvcName], Options).
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.

%% ---------------------------------------------------------------------------
%% # stop/1
%% ---------------------------------------------------------------------------

stop(SvcName) ->
    case whois(SvcName) of
        undefined ->
            {error, not_started};
        Pid ->
            stop(call_service(Pid, stop), Pid)
    end.

stop(ok, Pid) ->
    MRef = erlang:monitor(process, Pid),
    receive {'DOWN', MRef, process, _, _} -> ok end;
stop(No, _) ->
    No.

%% ---------------------------------------------------------------------------
%% # start_transport/3
%% ---------------------------------------------------------------------------

start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
    call_service_by_name(SvcName, {start, T}).

%% ---------------------------------------------------------------------------
%% # stop_transport/2
%% ---------------------------------------------------------------------------

stop_transport(_, []) ->
    ok;
stop_transport(SvcName, [_|_] = Refs) ->
    call_service_by_name(SvcName, {stop, Refs}).

%% ---------------------------------------------------------------------------
%% # info/2
%% ---------------------------------------------------------------------------

info(SvcName, Item) ->
    case lookup_state(SvcName) of
        [#state{} = S] ->
            service_info(Item, S);
        [] ->
            undefined
    end.

%% lookup_state/1

lookup_state(SvcName) ->
    ets:lookup(?STATE_TABLE, SvcName).

%% ---------------------------------------------------------------------------
%% # subscribe/1
%% # unsubscribe/1
%% ---------------------------------------------------------------------------

subscribe(SvcName) ->
    diameter_reg:add({?MODULE, subscriber, SvcName}).

unsubscribe(SvcName) ->
    diameter_reg:del({?MODULE, subscriber, SvcName}).

subscriptions(Pat) ->
    pmap(diameter_reg:match({?MODULE, subscriber, Pat})).

subscriptions() ->
    subscriptions('_').

pmap(Props) ->
    lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).

%% ---------------------------------------------------------------------------
%% # services/1
%% ---------------------------------------------------------------------------

services(Pat) ->
    pmap(diameter_reg:match({?MODULE, service, Pat})).

services() ->
    services('_').

whois(SvcName) ->
    case diameter_reg:match({?MODULE, service, SvcName}) of
        [{_, Pid}] ->
            Pid;
        [] ->
            undefined
    end.

%% ---------------------------------------------------------------------------
%% # pick_peer/3
%% ---------------------------------------------------------------------------

-spec pick_peer(SvcName, AppOrAlias, Opts)
   -> {{TPid, Caps, App}, Mask}
    | false
    | {error, term()}
 when SvcName :: diameter:service_name(),
      AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
      Opts :: tuple(),
      TPid :: pid(),
      Caps :: #diameter_caps{},
      App  :: #diameter_app{},
      Mask :: diameter:sequence().

pick_peer(SvcName, App, Opts) ->
    pick(lookup_state(SvcName), App, Opts).

pick([], _, _) ->
    {error, no_service};

pick([S], App, Opts) ->
    pick(S, App, Opts);

pick(#state{service = #diameter_service{applications = Apps}}
     = S,
     {alias, Alias},
     Opts) ->  %% initial call from diameter:call/4
    pick(S, find_outgoing_app(Alias, Apps), Opts);

pick(_, false, _) ->
    false;

pick(#state{options = [{_, Mask} | _]}
     = S,
     #diameter_app{module = ModX, dictionary = Dict}
     = App0,
     {DestF, Filter, Xtra}) ->
    App = App0#diameter_app{module = ModX ++ Xtra},
    [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
    case pick_peer(App, RealmAndHost, Filter, S) of
        {TPid, Caps} ->
            {{TPid, Caps, App}, Mask};
        false = No ->
            No
    end.

%% ---------------------------------------------------------------------------
%% # find_incoming_app/4
%% ---------------------------------------------------------------------------

-spec find_incoming_app(PeerT, TPid, Id, Apps)
   -> {#diameter_app{}, #diameter_caps{}}  %% connection and suitable app
    | #diameter_caps{}                     %% connection but no suitable app
    | false                                %% no connection
 when PeerT :: ets:tid(),
      TPid  :: pid(),
      Id    :: non_neg_integer(),
      Apps  :: [#diameter_app{}].

find_incoming_app(PeerT, TPid, Id, Apps) ->
    try ets:lookup(PeerT, TPid) of
        [#peer{} = P] ->
            find_incoming_app(P, Id, Apps);
        [] ->             %% transport has gone down
            false
    catch
        error: badarg ->  %% service has gone down (and taken table with it)
            false
    end.

%% ---------------------------------------------------------------------------
%% # notify/2
%% ---------------------------------------------------------------------------

notify(SvcName, Msg) ->
    Pid = whois(SvcName),
    is_pid(Pid) andalso (Pid ! Msg).

%% ===========================================================================
%% ===========================================================================

state(Svc) ->
    call_service(Svc, state).

uptime(Svc) ->
    call_service(Svc, uptime).

%% call_module/3

call_module(Service, AppMod, Request) ->
    call_service(Service, {call_module, AppMod, Request}).

%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------

init([SvcName]) ->
    process_flag(trap_exit, true),  %% ensure terminate(shutdown, _)
    i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).

i(SvcName, true) ->
    {ok, i(SvcName)};
i(_, false) ->
    {stop, {shutdown, already_started}}.

%% ---------------------------------------------------------------------------
%% # handle_call/3
%% ---------------------------------------------------------------------------

handle_call(state, _, S) ->
    {reply, S, S};

handle_call(uptime, _, #state{id = T} = S) ->
    {reply, diameter_lib:now_diff(T), S};

%% Start a transport.
handle_call({start, {Ref, Type, Opts}}, _From, S) ->
    {reply, start(Ref, {Type, Opts}, S), S};

%% Stop transports.
handle_call({stop, Refs}, _From, S) ->
    shutdown(Refs, S),
    {reply, ok, S};

%% pick_peer with mutable state
handle_call({pick_peer, Local, Remote, App}, _From, S) ->
    #diameter_app{mutable = true} = App,  %% assert
    {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};

handle_call({call_module, AppMod, Req}, From, S) ->
    call_module(AppMod, Req, From, S);

handle_call(stop, _From, S) ->
    shutdown(service, S),
    {stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
%% stating a monitor that waits for DOWN before returning.

handle_call(Req, From, S) ->
    unexpected(handle_call, [Req, From], S),
    {reply, nok, S}.

%% ---------------------------------------------------------------------------
%% # handle_cast/2
%% ---------------------------------------------------------------------------

handle_cast(Req, S) ->
    unexpected(handle_cast, [Req], S),
    {noreply, S}.

%% ---------------------------------------------------------------------------
%% # handle_info/2
%% ---------------------------------------------------------------------------

handle_info(T, #state{} = S) ->
    case transition(T,S) of
        ok ->
            {noreply, S};
        {stop, Reason} ->
            {stop, {shutdown, Reason}, S}
    end.

%% transition/2

%% Peer process is telling us to start a new accept process.
transition({accepted, Pid, TPid}, S) ->
    accepted(Pid, TPid, S),
    ok;

%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
    reconnect(Pid, S),
    ok;

%% Watchdog is sending notification of transport death.
transition({close, Pid, Reason}, #state{service_name = SvcName,
                                        watchdogT = WatchdogT}) ->
    #watchdog{state = WS,
              ref = Ref,
              type = Type,
              options = Opts}
        = fetch(WatchdogT, Pid),
    WS /= ?WD_OKAY
        andalso
        send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
    ok;

%% Watchdog is sending notification of a state transition.
transition({watchdog, Pid, {[TPid | Data], From, To}},
           #state{service_name = SvcName,
                  watchdogT = WatchdogT}
           = S) ->
    #watchdog{ref = Ref, type = T, options = Opts}
        = Wd
        = fetch(WatchdogT, Pid),
    watchdog(TPid, Data, From, To, Wd, S),
    send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
    ok;
%% Death of a watchdog process (#watchdog.pid) results in the removal of
%% it's peer and any associated conn record when 'DOWN' is received.
%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
%% ?WD_DOWN.

%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
    {stop, {monitor, Reason}};

%% Local watchdog process has died.
transition({'DOWN', _, process, Pid, _Reason}, S)
  when node(Pid) == node() ->
    watchdog_down(Pid, S),
    ok;

%% Remote service wants to know about shared peers.
transition({service, Pid}, S) ->
    share_peers(Pid, S),
    ok;

%% Remote service is communicating a shared peer.
transition({peer, TPid, Aliases, Caps}, S) ->
    remote_peer_up(TPid, Aliases, Caps, S),
    ok;

%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
    remote_peer_down(TPid, S),
    ok;

%% Restart after tc expiry.
transition({tc_timeout, T}, S) ->
    tc_timeout(T, S),
    ok;

transition(Req, S) ->
    unexpected(handle_info, [Req], S),
    ok.

%% ---------------------------------------------------------------------------
%% # terminate/2
%% ---------------------------------------------------------------------------

terminate(Reason, #state{service_name = Name} = S) ->
    send_event(Name, stop),
    ets:delete(?STATE_TABLE, Name),
    shutdown == Reason  %% application shutdown
        andalso shutdown(application, S).

%% ---------------------------------------------------------------------------
%% # code_change/3
%% ---------------------------------------------------------------------------

code_change(FromVsn,
            #state{service_name = SvcName,
                   service = #diameter_service{applications = Apps}}
            = S,
            Extra) ->
    lists:foreach(fun(A) ->
                          code_change(FromVsn, SvcName, Extra, A)
                  end,
                  Apps),
    {ok, S}.

code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
    {ok, S} = cb(A, code_change, [FromVsn,
                                  mod_state(Alias),
                                  Extra,
                                  SvcName]),
    mod_state(Alias, S).

%% ===========================================================================
%% ===========================================================================

unexpected(F, A, #state{service_name = Name}) ->
    ?UNEXPECTED(F, A ++ [Name]).

cb(#diameter_app{module = [_|_] = M}, F, A) ->
    eval(M, F, A).

eval([M|X], F, A) ->
    apply(M, F, A ++ X).

%% Callback with state.

state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
         pick_peer = F,
         A) ->
    eval(ModX, F, A ++ [S]);

state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
    eval(ModX, F, A ++ [mod_state(Alias)]).

choose(true, X, _)  -> X;
choose(false, _, X) -> X.

ets_new(Tbl) ->
    ets:new(Tbl, [{keypos, 2}]).

insert(Tbl, Rec) ->
    ets:insert(Tbl, Rec),
    Rec.

%% Using the process dictionary for the callback state was initially
%% just a way to make what was horrendous trace (big state record and
%% much else everywhere) somewhat more readable. There's not as much
%% need for it now but it's no worse (except possibly that we don't
%% see the table identifier being passed around) than an ets table so
%% keep it.

mod_state(Alias) ->
    get({?MODULE, mod_state, Alias}).

mod_state(Alias, ModS) ->
    put({?MODULE, mod_state, Alias}, ModS).

%% ---------------------------------------------------------------------------
%% # shutdown/2
%% ---------------------------------------------------------------------------

%% remove_transport
shutdown(Refs, #state{watchdogT = WatchdogT})
  when is_list(Refs) ->
    ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);

%% application/service shutdown
shutdown(Reason, #state{watchdogT = WatchdogT})
  when Reason == application;
       Reason == service ->
    diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
                                [],
                                WatchdogT)).

%% st/2

st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
    lists:member(Ref, Refs)
        andalso (Pid ! {shutdown, self(), transport}).  %% 'DOWN' cleans up

%% st/3

st(#watchdog{pid = Pid}, Reason, Acc) ->
    Pid ! {shutdown, self(), Reason},
    [Pid | Acc].

%% ---------------------------------------------------------------------------
%% # call_service/2
%% ---------------------------------------------------------------------------

call_service(Pid, Req)
  when is_pid(Pid) ->
    cs(Pid, Req);
call_service(SvcName, Req) ->
    call_service_by_name(SvcName, Req).

call_service_by_name(SvcName, Req) ->
    cs(whois(SvcName), Req).

cs(Pid, Req)
  when is_pid(Pid) ->
    try
        gen_server:call(Pid, Req, infinity)
    catch
        E: Reason when E == exit ->
            {error, {E, Reason}}
    end;

cs(undefined, _) ->
    {error, no_service}.

%% ---------------------------------------------------------------------------
%% # i/1
%% ---------------------------------------------------------------------------

%% Intialize the state of a service gen_server.

i(SvcName) ->
    %% Split the config into a server state and a list of transports.
    {#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
                                     {false, []},
                                     diameter_config:lookup(SvcName)),

    %% Publish the state in order to be able to access it outside of
    %% the service process. Originally table identifiers were only
    %% known to the service process but we now want to provide the
    %% option of application callbacks being 'stateless' in order to
    %% avoid having to go through a common process. (Eg. An agent that
    %% sends a request for every incoming request.)
    true = ets:insert_new(?STATE_TABLE, S),

    %% Start fsms for each transport.
    send_event(SvcName, start),
    lists:foreach(fun(T) -> start_fsm(T,S) end, CL),

    init_shared(S),
    S.

cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
        {false, Acc}) ->
    lists:foreach(fun init_mod/1, Apps),
    S = #state{service_name = SvcName,
               service = Rec#diameter_service{pid = self()},
               monitor = mref(get_value(monitor, Opts)),
               options = service_options(Opts)},
    {S, Acc};

cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
  when Type == connect;
       Type == listen ->
    {S, [T | Acc]}.

service_options(Opts) ->
    [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
     {share_peers, get_value(share_peers, Opts)},
     {use_shared_peers, get_value(use_shared_peers, Opts)},
     {restrict_connections, proplists:get_value(restrict_connections,
                                                Opts,
                                                ?RESTRICT)}].
%% The order of options is significant since we match against the list.

mref(false = No) ->
    No;
mref(P) ->
    erlang:monitor(process, P).

init_shared(#state{options = [_, _, {_, true} | _],
                   service_name = Svc}) ->
    diameter_peer:notify(Svc, {service, self()});
init_shared(#state{options = [_, _, {_, false} | _]}) ->
    ok.

init_mod(#diameter_app{alias = Alias,
                       init_state = S}) ->
    mod_state(Alias, S).

start_fsm({Ref, Type, Opts}, S) ->
    start(Ref, {Type, Opts}, S).

get_value(Key, Vs) ->
    {_, V} = lists:keyfind(Key, 1, Vs),
    V.

%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------

%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
%% since they involve the same call to merge_service/2. We merge here
%% rather than earlier since the service may not yet be configured
%% when the transport is configured.

start(Ref, {T, Opts}, S)
  when T == connect;
       T == listen ->
    try
        {ok, start(Ref, type(T), Opts, S)}
    catch
        ?FAILURE(Reason) ->
            {error, Reason}
    end.
%% TODO: don't actually raise any errors yet

%% There used to be a difference here between the handling of
%% configured listening and connecting transports but now we simply
%% tell the transport_module to start an accepting or connecting
%% process respectively, the transport implementation initiating
%% listening on a port as required.
type(listen)      -> accept;
type(accept)      -> listen;
type(connect = T) -> T.

%% start/4

start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
                              peerT = PeerT,
                              options = SvcOpts,
                              service_name = SvcName,
                              service = Svc0})
  when Type == connect;
       Type == accept ->
    #diameter_service{applications = Apps}
        = Svc
        = merge_service(Opts, Svc0),
    {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
    Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName,
                                                        PeerT,
                                                        Apps,
                                                        Mask]),
                        Opts,
                        SvcOpts,
                        Svc}),
    insert(WatchdogT, #watchdog{pid = Pid,
                                type = Type,
                                ref = Ref,
                                options = Opts}),
    Pid.

%% Note that the service record passed into the watchdog is the merged
%% record so that each watchdog may get a different record. This
%% record is what is passed back into application callbacks.

s(Type, Ref, T) ->
    {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
    Pid.

%% merge_service/2

merge_service(Opts, Svc) ->
    lists:foldl(fun ms/2, Svc, Opts).

%% Limit the applications known to the fsm to those in the 'apps'
%% option. That this might be empty is checked by the fsm. It's not
%% checked at config-time since there's no requirement that the
%% service be configured first. (Which could be considered a bit odd.)
ms({applications, As}, #diameter_service{applications = Apps} = S)
  when is_list(As) ->
    S#diameter_service{applications
                       = [A || A <- Apps,
                               lists:member(A#diameter_app.alias, As)]};

%% The fact that all capabilities can be configured on the transports
%% means that the service doesn't necessarily represent a single
%% locally implemented Diameter peer as identified by Origin-Host: a
%% transport can configure its own Origin-Host. This means that the
%% service little more than a placeholder for default capabilities
%% plus a list of applications that individual transports can choose
%% to support (or not).
ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
  when is_list(Opts) ->
    %% make_caps has already succeeded in diameter_config so it will succeed
    %% again here.
    {ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
    Svc#diameter_service{capabilities = Caps};

ms(_, Svc) ->
    Svc.

%% ---------------------------------------------------------------------------
%% # accepted/3
%% ---------------------------------------------------------------------------

accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
    #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
        = Wd
        = fetch(WatchdogT, Pid),
    insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
    start(Ref, T, Opts, S).                     %% start new watchdog

fetch(Tid, Key) ->
    [T] = ets:lookup(Tid, Key),
    T.

%% ---------------------------------------------------------------------------
%% # watchdog/6
%%
%% React to a watchdog state transition.
%% ---------------------------------------------------------------------------

%% Watchdog has a new open connection.
watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
    connection_up({TPid, T}, Wd, State);

%% Watchdog has a new connection that will be opened after DW[RA]
%% exchange.
watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
    reopen({TPid, T}, Wd, State);

%% Watchdog has recovered a suspect connection.
watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
    #watchdog{peer = TPid} = Wd,  %% assert
    connection_up(Wd, State);

%% Watchdog has an unresponsive connection.
watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
    #watchdog{peer = TPid} = Wd,  %% assert
    connection_down(Wd, To, State);

%% Watchdog has lost its connection.
watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
    close(Wd, S),
    connection_down(Wd, To, S),
    ets:delete(PeerT, TPid);

watchdog(_, [], _, _, _, _) ->
    ok.

%% ---------------------------------------------------------------------------
%% # connection_up/3
%% ---------------------------------------------------------------------------

%% Watchdog process has reached state OKAY.

connection_up({TPid, {Caps, SupportedApps, Pkt}},
              #watchdog{pid = Pid}
              = Wd,
              #state{peerT = PeerT}
              = S) ->
    Pr = #peer{pid = TPid,
               apps = SupportedApps,
               caps = Caps,
               watchdog = Pid},
    insert(PeerT, Pr),
    connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).

%% ---------------------------------------------------------------------------
%% # reopen/3
%% ---------------------------------------------------------------------------

reopen({TPid, {Caps, SupportedApps, _Pkt}},
       #watchdog{pid = Pid}
       = Wd,
       #state{watchdogT = WatchdogT,
              peerT = PeerT}) ->
    insert(PeerT, #peer{pid = TPid,
                        apps = SupportedApps,
                        caps = Caps,
                        watchdog = Pid}),
    insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
                                  peer = TPid}).

%% ---------------------------------------------------------------------------
%% # connection_up/2
%% ---------------------------------------------------------------------------

%% Watchdog has recovered as suspect connection. Note that there has
%% been no new capabilties exchange in this case.

connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
    connection_up([], Wd, fetch(PeerT, TPid), S).

%% connection_up/4

connection_up(Extra,
              #watchdog{peer = TPid}
              = Wd,
              #peer{apps = SApps, caps = Caps}
              = Pr,
              #state{watchdogT = WatchdogT,
                     local_peers = LDict,
                     service_name = SvcName,
                     service = #diameter_service{applications = Apps}}
              = S) ->
    insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
    diameter_traffic:peer_up(TPid),
    insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
    report_status(up, Wd, Pr, S, Extra).

insert_local_peer(SApps, T, LDict) ->
    lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).

ilp({Id, Alias}, {TC, SA}, LDict) ->
    init_conn(Id, Alias, TC, SA),
    ?Dict:append(Alias, TC, LDict).

init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
    #diameter_app{id = Id}  %% assert
        = App
        = find_app(Alias, Apps),

    peer_cb(App, peer_up, [SvcName, TC])
        orelse exit(TPid, kill).  %% fake transport failure

%% ---------------------------------------------------------------------------
%% # find_incoming_app/3
%% ---------------------------------------------------------------------------

%% No one should be sending the relay identifier.
find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
    Caps;

find_incoming_app(Peer, Id, Apps)
  when is_integer(Id) ->
    find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);

%% Note that the apps represented in SApps may be a strict subset of
%% those in Apps.
find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
    case keyfind(Ids, 1, SApps) of
        {_Id, Alias} ->
            {#diameter_app{} = find_app(Alias, Apps), Caps};
        false ->
            Caps
    end.

%% keyfind/3

keyfind([], _, _) ->
    false;
keyfind([Key | Rest], Pos, L) ->
    case lists:keyfind(Key, Pos, L) of
        false ->
            keyfind(Rest, Pos, L);
        T ->
            T
    end.

%% find_outgoing_app/2

find_outgoing_app(Alias, Apps) ->
    case find_app(Alias, Apps) of
        #diameter_app{id = ?APP_ID_RELAY} ->
            false;
        A ->
            A
    end.

%% find_app/2

find_app(Alias, Apps) ->
    lists:keyfind(Alias, #diameter_app.alias, Apps).

%% Don't bring down the service (and all associated connections)
%% regardless of what happens.
peer_cb(App, F, A) ->
    try state_cb(App, F, A) of
        ModS ->
            mod_state(App#diameter_app.alias, ModS),
            true
    catch
        E:R ->
            diameter_lib:error_report({failure, {E, R, ?STACK}},
                                      {App, F, A}),
            false
    end.

%% ---------------------------------------------------------------------------
%% # connection_down/3
%% ---------------------------------------------------------------------------

connection_down(#watchdog{state = ?WD_OKAY,
                          peer = TPid}
                = Wd,
                #peer{caps = Caps,
                      apps = SApps}
                = Pr,
                #state{service_name = SvcName,
                       service = #diameter_service{applications = Apps},
                       local_peers = LDict}
                = S) ->
    report_status(down, Wd, Pr, S, []),
    remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
    diameter_traffic:peer_down(TPid);

connection_down(#watchdog{}, #peer{}, _) ->
    ok;

connection_down(#watchdog{state = ?WD_OKAY,
                          peer = TPid}
                = Wd,
                To,
                #state{watchdogT = WatchdogT,
                       peerT = PeerT}
                = S)
  when is_atom(To) ->
    insert(WatchdogT, Wd#watchdog{state = To}),
    connection_down(Wd, fetch(PeerT, TPid), S);

connection_down(#watchdog{},
                To,
                #state{})
  when is_atom(To) ->
    ok.     

remove_local_peer(SApps, T, LDict) ->
    lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).

rlp({Id, Alias}, {TC, SA}, LDict) ->
    L = ?Dict:fetch(Alias, LDict),
    down_conn(Id, Alias, TC, SA),
    ?Dict:store(Alias, lists:delete(TC, L), LDict).

down_conn(Id, Alias, TC, {SvcName, Apps}) ->
    #diameter_app{id = Id}  %% assert
        = App
        = find_app(Alias, Apps),

    peer_cb(App, peer_down, [SvcName, TC]).

%% ---------------------------------------------------------------------------
%% # watchdog_down/2
%% ---------------------------------------------------------------------------

%% Watchdog process has died.

watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
    Wd = fetch(WatchdogT, Pid),
    ets:delete_object(WatchdogT, Wd),
    restart(Wd,S),
    wd_down(Wd,S).

%% Watchdog has never reached OKAY ...
wd_down(#watchdog{peer = B}, _)
  when is_boolean(B) ->
    ok;

%% ... or maybe it has.
wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
    connection_down(Wd, ?WD_DOWN, S),
    ets:delete(PeerT, TPid).

%% restart/2

restart(Wd, S) ->
    q_restart(restart(Wd), S).

%% restart/1

%% Always try to reconnect.
restart(#watchdog{ref = Ref,
                  type = connect = T,
                  options = Opts,
                  started = Time}) ->
    {Time, {Ref, T, Opts}};

%% Transport connection hasn't yet been accepted ...
restart(#watchdog{ref = Ref,
                  type = accept = T,
                  options = Opts,
                  peer = false,
                  started = Time}) ->
    {Time, {Ref, T, Opts}};

%% ... or it has: a replacement has already been spawned.
restart(#watchdog{type = accept}) ->
    false.

%% q_restart/2

%% Start the reconnect timer.
q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
    start_tc(tc(Time, default_tc(Type, Opts)), T, S);
q_restart(false, _) ->
    ok.

%% RFC 3588, 2.1:
%%
%%   When no transport connection exists with a peer, an attempt to
%%   connect SHOULD be periodically made.  This behavior is handled via
%%   the Tc timer, whose recommended value is 30 seconds.  There are
%%   certain exceptions to this rule, such as when a peer has terminated
%%   the transport connection stating that it does not wish to
%%   communicate.

default_tc(connect, Opts) ->
    proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
default_tc(accept, _) ->
    0.

%% Bound tc below if the watchdog was restarted recently to avoid
%% continuous restarted in case of faulty config or other problems.
tc(Time, Tc) ->
    choose(Tc > ?RESTART_TC
             orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
           Tc,
           ?RESTART_TC).

start_tc(0, T, S) ->
    tc_timeout(T, S);
start_tc(Tc, T, _) ->
    erlang:send_after(Tc, self(), {tc_timeout, T}).

%% tc_timeout/2

tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
    tc(diameter_config:have_transport(SvcName, Ref), T, S).

tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
                            = S) ->
    send_event(SvcName, {reconnect, Ref, Opts}),
    start(Ref, Type, Opts, S);
tc(false = No, _, _) ->  %% removed
    No.

%% ---------------------------------------------------------------------------
%% # close/2
%% ---------------------------------------------------------------------------

%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
%% another watchdog to be able to detect that it should transition
%% from initial into reopen rather than okay. That someone is either
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.

close(#watchdog{type = connect}, _) ->
    ok;
close(#watchdog{type = accept,
                pid = Pid,
                ref = Ref,
                options = Opts},
      #state{service_name = SvcName}) ->
    c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).

%% Tell watchdog to (maybe) die later ...
c(Pid, true, Opts) ->
    Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
    erlang:send_after(Tc, Pid, close);

%% ... or now.
c(Pid, false, _Opts) ->
    Pid ! close.

%% The RFC's only document the behaviour of Tc, our reconnect_timer,
%% for the establishment of connections but we also give
%% reconnect_timer semantics for a listener, being the time within
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.

%% ---------------------------------------------------------------------------
%% # reconnect/2
%% ---------------------------------------------------------------------------

reconnect(Pid, #state{service_name = SvcName,
                      watchdogT = WatchdogT}) ->
    #watchdog{ref = Ref,
              type = connect,
              options = Opts}
        = fetch(WatchdogT, Pid),
    send_event(SvcName, {reconnect, Ref, Opts}).

%% ---------------------------------------------------------------------------
%% # call_module/4
%% ---------------------------------------------------------------------------

%% Backwards compatibility and never documented/advertised. May be
%% removed.

call_module(Mod, Req, From, #state{service
                                   = #diameter_service{applications = Apps},
                                   service_name = Svc}
                            = S) ->
    case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
            Req,
            From,
            Svc)
    of
        {reply = T, RC} ->
            {T, RC, S};
        noreply = T ->
            {T, S};
        Reason ->
            {reply, {error, Reason}, S}
    end.

cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
    Args = [Req, From, Svc],

    try state_cb(App, handle_call, Args) of
        {noreply = T, ModS} ->
            mod_state(Alias, ModS),
            T;
        {reply = T, RC, ModS} ->
            mod_state(Alias, ModS),
            {T, RC};
        T ->
            diameter_lib:error_report({invalid, T},
                                      {App, handle_call, Args}),
            invalid
    catch
        E: Reason ->
            diameter_lib:error_report({failure, {E, Reason, ?STACK}},
                                      {App, handle_call, Args}),
            failure
    end;

cm([], _, _, _) ->
    unknown;

cm([_,_|_], _, _, _) ->
    multiple.

%% ---------------------------------------------------------------------------
%% # report_status/5
%% ---------------------------------------------------------------------------

report_status(Status,
              #watchdog{ref = Ref,
                        peer = TPid,
                        type = Type,
                        options = Opts},
              #peer{apps = [_|_] = As,
                    caps = Caps},
              #state{service_name = SvcName}
              = S,
              Extra) ->
    share_peer(Status, Caps, As, TPid, S),
    Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
    send_event(SvcName, list_to_tuple(Info)).

%% send_event/2

send_event(SvcName, Info) ->
    send_event(#diameter_event{service = SvcName,
                               info = Info}).

send_event(#diameter_event{service = SvcName} = E) ->
    lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).

%% ---------------------------------------------------------------------------
%% # share_peer/5
%% ---------------------------------------------------------------------------

share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
                                           service_name = Svc}) ->
    diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});

share_peer(_, _, _, _, _) ->
    ok.

%% ---------------------------------------------------------------------------
%% # share_peers/2
%% ---------------------------------------------------------------------------

share_peers(Pid, #state{options = [_, {_, true} | _],
                        local_peers = PDict}) ->
    ?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);

share_peers(_, _) ->
    ok.

sp(Pid, Alias, Peers) ->
    lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).

%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------

remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
                                          service = Svc,
                                          shared_peers = PDict}) ->
    #diameter_service{applications = Apps} = Svc,
    Key = #diameter_app.alias,
    As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
    rpu(Pid, Caps, PDict, As);

remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
    ok.

rpu(_, _, PDict, []) ->
    PDict;
rpu(Pid, Caps, PDict, Aliases) ->
    erlang:monitor(process, Pid),
    T = {Pid, Caps},
    lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).

%% ---------------------------------------------------------------------------
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------

remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
                             shared_peers = PDict}) ->
    lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).

rpd(Pid, Alias, PDict) ->
    ?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).

%% ---------------------------------------------------------------------------
%% pick_peer/4
%% ---------------------------------------------------------------------------

pick_peer(#diameter_app{alias = Alias}
          = App,
          RealmAndHost,
          Filter,
          #state{local_peers = L,
                 shared_peers = S,
                 service_name = SvcName,
                 service = #diameter_service{pid = Pid}}) ->
    pick_peer(peers(Alias, RealmAndHost, Filter, L),
              peers(Alias, RealmAndHost, Filter, S),
              Pid,
              SvcName,
              App).

%% pick_peer/5

pick_peer([], [], _, _, _) ->
    false;

%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
  when self() /= Pid ->
    case call_service(Pid, {pick_peer, Local, Remote, App}) of
        {TPid, _} = T when is_pid(TPid) ->
            T;
        {error, _} ->
            false
    end;

%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
pick_peer(Local,
          Remote,
          _Pid,
          SvcName,
          #diameter_app{alias = Alias,
                        init_state = S,
                        mutable = M}
          = App) ->
    Args = [Local, Remote, SvcName],

    try state_cb(App, pick_peer, Args) of
        {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
            T;
        {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
            mod_state(Alias, ModS),
            T;
        {false = No, ModS} when M ->
            mod_state(Alias, ModS),
            No;
        {ok, false = No} ->
            No;
        false = No ->
            No;
        {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
            T;                     %% Accept returned state in the immutable
        {false = No, S} ->         %% case as long it isn't changed.
            No;
        T ->
            diameter_lib:error_report({invalid, T, App},
                                      {App, pick_peer, Args})
    catch
        E: Reason ->
            diameter_lib:error_report({failure, {E, Reason, ?STACK}},
                                      {App, pick_peer, Args})
    end.

%% peers/4

peers(Alias, RH, Filter, Peers) ->
    case ?Dict:find(Alias, Peers) of
        {ok, L} ->
            ps(L, RH, Filter, {[],[]});
        error ->
            []
    end.

%% Place a peer whose Destination-Host/Realm matches those of the
%% request at the front of the result list. Could add some sort of
%% 'sort' option to allow more control.

ps([], _, _, {Ys, Ns}) ->
    lists:reverse(Ys, Ns);
ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
    ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
                              caps_filter(Caps, RH, {all, [host, realm]}),
                              TC,
                              Acc)).

pacc(true, true, Peer, {Ts, Fs}) ->
    {[Peer|Ts], Fs};
pacc(true, false, Peer, {Ts, Fs}) ->
    {Ts, [Peer|Fs]};
pacc(_, _, _, Acc) ->
    Acc.

%% caps_filter/3

caps_filter(C, RH, {neg, F}) ->
    not caps_filter(C, RH, F);

caps_filter(C, RH, {all, L})
  when is_list(L) ->
    lists:all(fun(F) -> caps_filter(C, RH, F) end, L);

caps_filter(C, RH, {any, L})
  when is_list(L) ->
    lists:any(fun(F) -> caps_filter(C, RH, F) end, L);

caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
    eq(undefined, DH, OH);

caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
    eq(undefined, DR, OR);

caps_filter(C, _, Filter) ->
    caps_filter(C, Filter).

%% caps_filter/2

caps_filter(_, none) ->
    true;

caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
    eq(any, H, OH);

caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
    eq(any, R, OR);

%% Anything else is expected to be an eval filter. Filter failure is
%% documented as being equivalent to a non-matching filter.

caps_filter(C, T) ->
    try
        {eval, F} = T,
        diameter_lib:eval([F,C])
    catch
        _:_ -> false
    end.

eq(Any, Id, PeerId) ->
    Any == Id orelse try
                         iolist_to_binary(Id) == iolist_to_binary(PeerId)
                     catch
                         _:_ -> false
                     end.
%% OctetString() can be specified as an iolist() so test for string
%% rather then term equality.

%% transports/1

transports(#state{watchdogT = WatchdogT}) ->
    ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
                        [{'is_pid', '$1'}],
                        ['$1']}]).

%% ---------------------------------------------------------------------------
%% # service_info/2
%% ---------------------------------------------------------------------------

%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
                   'Origin-Realm',
                   'Vendor-Id',
                   'Product-Name',
                   'Origin-State-Id',
                   'Host-IP-Address',
                   'Supported-Vendor-Id',
                   'Auth-Application-Id',
                   'Inband-Security-Id',
                   'Acct-Application-Id',
                   'Vendor-Specific-Application-Id',
                   'Firmware-Revision']).

%% The config returned by diameter:service_info(SvcName, all).
-define(ALL_INFO, [capabilities,
                   applications,
                   transport,
                   pending,
                   options]).

%% The rest.
-define(OTHER_INFO, [connections,
                     name,
                     peers,
                     statistics]).

service_info(Item, S)
  when is_atom(Item) ->
    case tagged_info(Item, S) of
        {_, T} -> T;
        undefined = No -> No
    end;

service_info(Items, S) ->
    tagged_info(Items, S).

tagged_info(Item, S)
  when is_atom(Item) ->
    case complete(Item) of
        {value, I} ->
            {I, complete_info(I,S)};
        false ->
            undefined
    end;

tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
  when is_pid(TPid) ->
    try
        [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
        [#watchdog{ref = Ref, type = Type, options = Opts}]
            = ets:lookup(WatchdogT, Pid),
        [{ref, Ref},
         {type, Type},
         {options, Opts}]
    catch
        error:_ ->
            []
    end;

tagged_info(Items, S)
  when is_list(Items) ->
    [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];

tagged_info(_, _) ->
    undefined.

complete_info(Item, #state{service = Svc} = S) ->
    case Item of
        name ->
            S#state.service_name;
        'Origin-Host' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_host;
        'Origin-Realm' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_realm;
        'Vendor-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.vendor_id;
        'Product-Name' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.product_name;
        'Origin-State-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.origin_state_id;
        'Host-IP-Address' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.host_ip_address;
        'Supported-Vendor-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.supported_vendor_id;
        'Auth-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.auth_application_id;
        'Inband-Security-Id'  ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.inband_security_id;
        'Acct-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.acct_application_id;
        'Vendor-Specific-Application-Id' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.vendor_specific_application_id;
        'Firmware-Revision' ->
            (Svc#diameter_service.capabilities)
                #diameter_caps.firmware_revision;
        capabilities -> service_info(?CAP_INFO, S);
        applications -> info_apps(S);
        transport    -> info_transport(S);
        options      -> info_options(S);
        pending      -> info_pending(S);
        keys         -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
        all          -> service_info(?ALL_INFO, S);
        statistics   -> info_stats(S);
        connections  -> info_connections(S);
        peers        -> info_peers(S)
    end.

complete(I)
  when I == keys;
       I == all ->
    {value, I};
complete(Pre) ->
    P = atom_to_list(Pre),
    case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO,
               lists:prefix(P, atom_to_list(I))]
    of
        [I] -> {value, I};
        _   -> false
    end.

%% info_stats/1

info_stats(#state{watchdogT = WatchdogT}) ->
    MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
                  [{'is_pid', '$2'}],
                  [['$1', '$2']]}],
    try ets:select(WatchdogT, MatchSpec) of
        L ->
            diameter_stats:read(lists:append(L))
    catch
        error: badarg -> []  %% service  has gone down
    end.

%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
%% the accumulated values for the ref and associated watchdog/peer
%% pids.

info_transport(S) ->
    PeerD = peer_dict(S, config_dict(S)),
    RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
                     PeerD),
    Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
                                  [],
                                  RefsD)),
    Stats = diameter_stats:read(Refs),
    dict:fold(fun(R, Ls, A) ->
                      Ps = dict:fetch(R, RefsD),
                      [[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
                       | A]
              end,
              [],
              PeerD).

%% Only a config entry for a listening transport: use it.
transport([[{type, listen}, _] = L]) ->
    L ++ [{accept, []}];

%% Only one config or peer entry for a connecting transport: use it.
transport([[{type, connect} | _] = L]) ->
    L;

%% Peer entries: discard config. Note that the peer entries have
%% length at least 3.
transport([[_,_] | L]) ->
    transport(L);

%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
    [{type, listen},
     {options, Opts},
     {accept, [lists:nthtail(2,L) || L <- Ls]}].

peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
    try ets:tab2list(WatchdogT) of
        L ->
            lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
    catch
        error: badarg -> Dict0  %% service has gone down
    end.

peer_acc(PeerT, Acc, #watchdog{pid = Pid,
                               type = Type,
                               ref = Ref,
                               options = Opts,
                               state = WS,
                               started = At,
                               peer = TPid}) ->
    dict:append(Ref,
                [{type, Type},
                 {options, Opts},
                 {watchdog, {Pid, At, WS}}
                 | info_peer(PeerT, TPid, WS)],
                Acc).

info_peer(PeerT, TPid, WS)
  when is_pid(TPid), WS /= ?WD_DOWN ->
    try ets:lookup(PeerT, TPid) of
        T -> info_peer(T)
    catch
        error: badarg -> []  %% service has gone down
    end;
info_peer(_, _, _) ->
    [].

%% The point of extracting the config here is so that 'transport' info
%% has one entry for each transport ref, the peer table only
%% containing entries that have a living watchdog.

config_dict(#state{service_name = SvcName}) ->
    lists:foldl(fun config_acc/2,
                dict:new(),
                diameter_config:lookup(SvcName)).

config_acc({Ref, T, Opts}, Dict)
  when T == listen;
       T == connect ->
    dict:store(Ref, [[{type, T}, {options, Opts}]], Dict);
config_acc(_, Dict) ->
    Dict.

info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
    [{peer, {Pid, T}},
     {apps, SApps},
     {caps, info_caps(Caps)}
     | try [{port, info_port(Pid)}] catch _:_ -> [] end];
info_peer([] = No) ->
    No.

%% Extract information that the processes involved are expected to
%% "publish" in their process dictionaries. Simple but backhanded.
info_port(Pid) ->
    {_, PD} = process_info(Pid, dictionary),
    {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
    {TPid, {_Type, TMod, _Cfg}} = T,
    {_, TD} = process_info(TPid, dictionary),
    {_, Data} = lists:keyfind({TMod, info}, 1, TD),
    [{owner, TPid},
     {module, TMod}
     | try TMod:info(Data) catch _:_ -> [] end].

%% Use the fields names from diameter_caps instead of
%% diameter_base_CER to distinguish between the 2-tuple values
%% compared to the single capabilities values. Note also that the
%% returned list is tagged 'caps' rather than 'capabilities' to
%% emphasize the difference.
info_caps(#diameter_caps{} = C) ->
    lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).

info_apps(#state{service = #diameter_service{applications = Apps}}) ->
    lists:map(fun mk_app/1, Apps).

mk_app(#diameter_app{} = A) ->
    lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))).

%% info_pending/1
%%
%% One entry for each outgoing request whose answer is outstanding.

info_pending(#state{} = S) ->
    diameter_traffic:pending(transports(S)).

%% info_connections/1
%%
%% One entry per transport connection. Statistics for each entry are
%% for the peer pid only.

info_connections(S) ->
    ConnL = conn_list(S),
    Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]),
    [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L].

conn_list(S) ->
    lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S, dict:new()))).

conn_acc(Ref, Peers, Acc) ->
    [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)]
     | Acc].

stats(Refs, Stats) ->
    {statistics, dict:to_list(lists:foldl(fun(R,D) ->
                                                  stats_acc(R, D, Stats)
                                          end,
                                          dict:new(),
                                          Refs))}.

stats_acc(Ref, Dict, Stats) ->
    lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end,
                Dict,
                proplists:get_value(Ref, Stats, [])).

%% info_peers/1
%%
%% One entry per peer Origin-Host. Statistics for each entry are
%% accumulated values for all peer pids.

info_peers(S) ->
    {PeerD, RefD} = lists:foldl(fun peer_acc/2,
                                {dict:new(), dict:new()},
                                conn_list(S)),
    Refs = lists:append(dict:fold(fun(_, Rs, A) -> [Rs|A] end,
                                  [],
                                  RefD)),
    Stats = diameter_stats:read(Refs),
    dict:fold(fun(OH, Cs, A) ->
                      Rs = dict:fetch(OH, RefD),
                      [{OH, [{connections, Cs}, stats(Rs, Stats)]} | A]
              end,
              [],
              PeerD).

peer_acc(Peer, {PeerD, RefD}) ->
    [{TPid, _}, [{origin_host, {_, OH}} | _]]
        = [proplists:get_value(K, Peer) || K <- [peer, caps]],
    {dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}.

%% info_options/1

info_options(S) ->
    S#state.options.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: smime.p7s
Type: application/pkcs7-signature
Size: 2182 bytes
Desc: Kryptograficzna sygnatura S/MIME
URL: <http://erlang.org/pipermail/erlang-bugs/attachments/20130520/6819412f/attachment.bin>


More information about the erlang-bugs mailing list