[erlang-bugs] Memory leak in diameter_service module in diameter app (otp_R16B)
Aleksander Nycz
Aleksander.Nycz@REDACTED
Mon May 20 11:45:16 CEST 2013
Hello,
I think there is a problem with resource leak (memory) in
diameter_service module.
This module is a gen_server, that state contains field watchdogT ::
ets:tid().
This ets contains info about watchdogs.
Diameter app service cfg is:
[{'Origin-Host', HostName},
{'Origin-Realm', Realm},
{'Vendor-Id', ...},
{'Product-Name', ...},
{'Auth-Application-Id', [?DCCA_APP_ID]},
{'Supported-Vendor-Id', [...]},
{application, [{alias, diameterNode},
{dictionary, dictionaryDCCA},
{module, dccaCallback}]},
{*restrict_connections, false*}]
After start dimeter app, adding service and transport, diameter_service
state is:
> diameter_service:state(diameterNode).
#state{id = {1369,41606,329900},
service_name = diameterNode,
service = #diameter_service{pid = <0.1011.0>,
capabilities = #diameter_caps{...},
applications = [#diameter_app{...}]},
watchdogT = 4194395,peerT = 4259932,shared_peers = 4325469,
local_peers = 4391006,monitor = false,
options = [{sequence,{0,32}},
{share_peers,false},
{use_shared_peers,false},
{restrict_connections,false}]}
and ets 4194395 has one record:
> ets:tab2list(4194395).
[#watchdog{pid = <0.1013.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
state = initial,
started = {1369,41606,330086},
peer = false}]
Next I run very simple test using seagull symulator. Test scenario is
following:
1. seagull: send CER
2. seagull: recv CEA
3. seagull: send CCR (init)
4. seagull: recv CCA (init)
5. seagull: send CCR (update)
6. seagull: recv CCR (update)
7. seagull: send CCR (terminate)
8. seagull: recv CCA (terminate)
Durring test there are two watchdogs in ets:
> ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = initial*,
started = {1369,41823,711370},
peer = false},
#watchdog{pid = <0.1013.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = okay*,
started = {1369,41606,330086},
peer = <0.1014.0>}]
After test but before tw timer elapsed, there is two watchdogs also and
this is ok:
> ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = initial*,
started = {1369,41823,711370},
peer = false},
#watchdog{pid = *<0.1013.0>*,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = down*,
started = {1369,41606,330086},
peer = *<0.1014.0>*}]
But when tm timer elapsed transport and watchdog processes are finished:
> erlang:is_process_alive(list_to_pid("*<0.1014.0>*")).
false
> erlang:is_process_alive(list_to_pid("*<0.1013.0>*")).
false
and still two watchdogs are in ets:
> ets:tab2list(4194395).
[#watchdog{pid = <0.1816.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
state = initial,
started = {1369,41823,711370},
peer = false},
#watchdog{pid = <0.1013.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
state = down,
started = {1369,41606,330086},
peer = <0.1014.0>}]
I think watchdog *<0.1013.0> *should be removed when watchdog process is
being finished.
I run next test and now there are 3 watchdogs in ets:
> ets:tab2list(4194395).
[#watchdog{pid = *<0.1816.0>*,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = down*,
started = {1369,41823,711370},
peer = *<0.1817.0>*},
#watchdog{pid = <0.1013.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = down*,
started = {1369,41606,330086},
peer = <0.1014.0>},
#watchdog{pid = <0.3533.0>,type = accept,
ref = #Ref<0.0.0.1696>,
options = [{transport_module,diameter_tcp},
{transport_config,[{reuseaddr,true},
{ip,{0,0,0,0}},
{port,4068}]},
{capabilities_cb,[#Fun<diameterNode.acceptCER.2>]},
{watchdog_timer,30000},
{reconnect_timer,60000}],
*state = initial*,
started = {1369,42342,845898},
peer = false}]
Watchdog and transport process are not alive:
> erlang:is_process_alive(list_to_pid("<0.1816.0>")).
false
> erlang:is_process_alive(list_to_pid("<0.1817.0>")).
false
I suggest following change in code to correct this problem (file
diameter_service.erl):
$ diff diameter_service.erl diameter_service.erl_ok
1006c1006
< connection_down(#watchdog{state = WS,
---
> connection_down(#watchdog{state = ?WD_OKAY,
1015,1017c1015,1021
< ?WD_OKAY == WS
< andalso
< connection_down(Wd, fetch(PeerT, TPid), S).
---
> connection_down(Wd, fetch(PeerT, TPid), S);
>
> connection_down(#watchdog{},
> To,
> #state{})
> when is_atom(To) ->
> ok.
You can find this solution in attachement.
Regards
Aleksander Nycz
--
Aleksander Nycz
Senior Software Engineer
Telco_021 BSS R&D
Comarch SA
Phone: +48 12 646 1216
Mobile: +48 691 464 275
website: www.comarch.pl
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-bugs/attachments/20130520/6819412f/attachment.htm>
-------------- next part --------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%% Implements the process that represents a service.
%%
-module(diameter_service).
-behaviour(gen_server).
%% towards diameter_service_sup
-export([start_link/1]).
%% towards diameter
-export([subscribe/1,
unsubscribe/1,
services/0,
info/2]).
%% towards diameter_config
-export([start/1,
stop/1,
start_transport/2,
stop_transport/2]).
%% towards diameter_peer
-export([notify/2]).
%% towards diameter_traffic
-export([find_incoming_app/4,
pick_peer/3]).
%% test/debug
-export([services/1,
subscriptions/1,
subscriptions/0,
call_module/3,
whois/1,
state/1,
uptime/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY, okay).
-define(WD_SUSPECT, suspect).
-define(WD_DOWN, down).
-define(WD_REOPEN, reopen).
-type wd_state() :: ?WD_INITIAL
| ?WD_OKAY
| ?WD_SUSPECT
| ?WD_DOWN
| ?WD_REOPEN.
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
-define(RESTART_TC, 1000). %% if restart was this recent
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
%% service process. In particular, rely on the fact that operations on
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).
%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
-define(STATE_TABLE, ?MODULE).
%% The default sequence mask.
-define(NOMASK, {0,32}).
%% The default restrict_connections.
-define(RESTRICT, nodes).
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
:: T | '_' | '$1' | '$2'.
%% State of service gen_server. Note that the state term itself
%% doesn't change, which is relevant for the stateless application
%% callbacks since the state is retrieved from ?STATE_TABLE from
%% outside the service process. The pid in the service record is used
%% to determine whether or not we need to call the process for a
%% pick_peer callback in the statefull case.
-record(state,
{id = now(),
service_name :: diameter:service_name(), %% key in ?STATE_TABLE
service :: #diameter_service{},
watchdogT = ets_new(watchdogs) %% #watchdog{} at start
:: ets:tid(),
peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
:: ets:tid(),
shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
:: ets:tid(),
local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
:: ets:tid(),
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
| {restrict_connections, diameter:restriction()}
| {share_peers, boolean()} %% broadcast peers to remote nodes?
| {use_shared_peers, boolean()}]}).%% use broadcasted peers?
%% shared_peers reflects the peers broadcast from remote nodes.
%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
-record(watchdog,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
peer = false :: match(boolean() | pid())}).
%% true at accepted, pid() at okay/reopen
%% Record representing an Peer State Machine processes implemented by
%% diameter_peer_fsm.
-record(peer,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
watchdog :: pid()}). %% key into watchdogT
%% ---------------------------------------------------------------------------
%% # start/1
%% ---------------------------------------------------------------------------
start(SvcName) ->
diameter_service_sup:start_child(SvcName).
start_link(SvcName) ->
Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
gen_server:start_link(?MODULE, [SvcName], Options).
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.
%% ---------------------------------------------------------------------------
%% # stop/1
%% ---------------------------------------------------------------------------
stop(SvcName) ->
case whois(SvcName) of
undefined ->
{error, not_started};
Pid ->
stop(call_service(Pid, stop), Pid)
end.
stop(ok, Pid) ->
MRef = erlang:monitor(process, Pid),
receive {'DOWN', MRef, process, _, _} -> ok end;
stop(No, _) ->
No.
%% ---------------------------------------------------------------------------
%% # start_transport/3
%% ---------------------------------------------------------------------------
start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
call_service_by_name(SvcName, {start, T}).
%% ---------------------------------------------------------------------------
%% # stop_transport/2
%% ---------------------------------------------------------------------------
stop_transport(_, []) ->
ok;
stop_transport(SvcName, [_|_] = Refs) ->
call_service_by_name(SvcName, {stop, Refs}).
%% ---------------------------------------------------------------------------
%% # info/2
%% ---------------------------------------------------------------------------
info(SvcName, Item) ->
case lookup_state(SvcName) of
[#state{} = S] ->
service_info(Item, S);
[] ->
undefined
end.
%% lookup_state/1
lookup_state(SvcName) ->
ets:lookup(?STATE_TABLE, SvcName).
%% ---------------------------------------------------------------------------
%% # subscribe/1
%% # unsubscribe/1
%% ---------------------------------------------------------------------------
subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
unsubscribe(SvcName) ->
diameter_reg:del({?MODULE, subscriber, SvcName}).
subscriptions(Pat) ->
pmap(diameter_reg:match({?MODULE, subscriber, Pat})).
subscriptions() ->
subscriptions('_').
pmap(Props) ->
lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
%% ---------------------------------------------------------------------------
%% # services/1
%% ---------------------------------------------------------------------------
services(Pat) ->
pmap(diameter_reg:match({?MODULE, service, Pat})).
services() ->
services('_').
whois(SvcName) ->
case diameter_reg:match({?MODULE, service, SvcName}) of
[{_, Pid}] ->
Pid;
[] ->
undefined
end.
%% ---------------------------------------------------------------------------
%% # pick_peer/3
%% ---------------------------------------------------------------------------
-spec pick_peer(SvcName, AppOrAlias, Opts)
-> {{TPid, Caps, App}, Mask}
| false
| {error, term()}
when SvcName :: diameter:service_name(),
AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
Opts :: tuple(),
TPid :: pid(),
Caps :: #diameter_caps{},
App :: #diameter_app{},
Mask :: diameter:sequence().
pick_peer(SvcName, App, Opts) ->
pick(lookup_state(SvcName), App, Opts).
pick([], _, _) ->
{error, no_service};
pick([S], App, Opts) ->
pick(S, App, Opts);
pick(#state{service = #diameter_service{applications = Apps}}
= S,
{alias, Alias},
Opts) -> %% initial call from diameter:call/4
pick(S, find_outgoing_app(Alias, Apps), Opts);
pick(_, false, _) ->
false;
pick(#state{options = [{_, Mask} | _]}
= S,
#diameter_app{module = ModX, dictionary = Dict}
= App0,
{DestF, Filter, Xtra}) ->
App = App0#diameter_app{module = ModX ++ Xtra},
[_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
case pick_peer(App, RealmAndHost, Filter, S) of
{TPid, Caps} ->
{{TPid, Caps, App}, Mask};
false = No ->
No
end.
%% ---------------------------------------------------------------------------
%% # find_incoming_app/4
%% ---------------------------------------------------------------------------
-spec find_incoming_app(PeerT, TPid, Id, Apps)
-> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app
| #diameter_caps{} %% connection but no suitable app
| false %% no connection
when PeerT :: ets:tid(),
TPid :: pid(),
Id :: non_neg_integer(),
Apps :: [#diameter_app{}].
find_incoming_app(PeerT, TPid, Id, Apps) ->
try ets:lookup(PeerT, TPid) of
[#peer{} = P] ->
find_incoming_app(P, Id, Apps);
[] -> %% transport has gone down
false
catch
error: badarg -> %% service has gone down (and taken table with it)
false
end.
%% ---------------------------------------------------------------------------
%% # notify/2
%% ---------------------------------------------------------------------------
notify(SvcName, Msg) ->
Pid = whois(SvcName),
is_pid(Pid) andalso (Pid ! Msg).
%% ===========================================================================
%% ===========================================================================
state(Svc) ->
call_service(Svc, state).
uptime(Svc) ->
call_service(Svc, uptime).
%% call_module/3
call_module(Service, AppMod, Request) ->
call_service(Service, {call_module, AppMod, Request}).
%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------
init([SvcName]) ->
process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).
i(SvcName, true) ->
{ok, i(SvcName)};
i(_, false) ->
{stop, {shutdown, already_started}}.
%% ---------------------------------------------------------------------------
%% # handle_call/3
%% ---------------------------------------------------------------------------
handle_call(state, _, S) ->
{reply, S, S};
handle_call(uptime, _, #state{id = T} = S) ->
{reply, diameter_lib:now_diff(T), S};
%% Start a transport.
handle_call({start, {Ref, Type, Opts}}, _From, S) ->
{reply, start(Ref, {Type, Opts}, S), S};
%% Stop transports.
handle_call({stop, Refs}, _From, S) ->
shutdown(Refs, S),
{reply, ok, S};
%% pick_peer with mutable state
handle_call({pick_peer, Local, Remote, App}, _From, S) ->
#diameter_app{mutable = true} = App, %% assert
{reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};
handle_call({call_module, AppMod, Req}, From, S) ->
call_module(AppMod, Req, From, S);
handle_call(stop, _From, S) ->
shutdown(service, S),
{stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
%% stating a monitor that waits for DOWN before returning.
handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
%% ---------------------------------------------------------------------------
%% # handle_cast/2
%% ---------------------------------------------------------------------------
handle_cast(Req, S) ->
unexpected(handle_cast, [Req], S),
{noreply, S}.
%% ---------------------------------------------------------------------------
%% # handle_info/2
%% ---------------------------------------------------------------------------
handle_info(T, #state{} = S) ->
case transition(T,S) of
ok ->
{noreply, S};
{stop, Reason} ->
{stop, {shutdown, Reason}, S}
end.
%% transition/2
%% Peer process is telling us to start a new accept process.
transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
%% Watchdog is sending notification of transport death.
transition({close, Pid, Reason}, #state{service_name = SvcName,
watchdogT = WatchdogT}) ->
#watchdog{state = WS,
ref = Ref,
type = Type,
options = Opts}
= fetch(WatchdogT, Pid),
WS /= ?WD_OKAY
andalso
send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
ok;
%% Watchdog is sending notification of a state transition.
transition({watchdog, Pid, {[TPid | Data], From, To}},
#state{service_name = SvcName,
watchdogT = WatchdogT}
= S) ->
#watchdog{ref = Ref, type = T, options = Opts}
= Wd
= fetch(WatchdogT, Pid),
watchdog(TPid, Data, From, To, Wd, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
%% Death of a watchdog process (#watchdog.pid) results in the removal of
%% it's peer and any associated conn record when 'DOWN' is received.
%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
%% required then someone should stop us.
transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
%% Local watchdog process has died.
transition({'DOWN', _, process, Pid, _Reason}, S)
when node(Pid) == node() ->
watchdog_down(Pid, S),
ok;
%% Remote service wants to know about shared peers.
transition({service, Pid}, S) ->
share_peers(Pid, S),
ok;
%% Remote service is communicating a shared peer.
transition({peer, TPid, Aliases, Caps}, S) ->
remote_peer_up(TPid, Aliases, Caps, S),
ok;
%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
remote_peer_down(TPid, S),
ok;
%% Restart after tc expiry.
transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
%% ---------------------------------------------------------------------------
%% # terminate/2
%% ---------------------------------------------------------------------------
terminate(Reason, #state{service_name = Name} = S) ->
send_event(Name, stop),
ets:delete(?STATE_TABLE, Name),
shutdown == Reason %% application shutdown
andalso shutdown(application, S).
%% ---------------------------------------------------------------------------
%% # code_change/3
%% ---------------------------------------------------------------------------
code_change(FromVsn,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps}}
= S,
Extra) ->
lists:foreach(fun(A) ->
code_change(FromVsn, SvcName, Extra, A)
end,
Apps),
{ok, S}.
code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
{ok, S} = cb(A, code_change, [FromVsn,
mod_state(Alias),
Extra,
SvcName]),
mod_state(Alias, S).
%% ===========================================================================
%% ===========================================================================
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
cb(#diameter_app{module = [_|_] = M}, F, A) ->
eval(M, F, A).
eval([M|X], F, A) ->
apply(M, F, A ++ X).
%% Callback with state.
state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
pick_peer = F,
A) ->
eval(ModX, F, A ++ [S]);
state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
eval(ModX, F, A ++ [mod_state(Alias)]).
choose(true, X, _) -> X;
choose(false, _, X) -> X.
ets_new(Tbl) ->
ets:new(Tbl, [{keypos, 2}]).
insert(Tbl, Rec) ->
ets:insert(Tbl, Rec),
Rec.
%% Using the process dictionary for the callback state was initially
%% just a way to make what was horrendous trace (big state record and
%% much else everywhere) somewhat more readable. There's not as much
%% need for it now but it's no worse (except possibly that we don't
%% see the table identifier being passed around) than an ets table so
%% keep it.
mod_state(Alias) ->
get({?MODULE, mod_state, Alias}).
mod_state(Alias, ModS) ->
put({?MODULE, mod_state, Alias}, ModS).
%% ---------------------------------------------------------------------------
%% # shutdown/2
%% ---------------------------------------------------------------------------
%% remove_transport
shutdown(Refs, #state{watchdogT = WatchdogT})
when is_list(Refs) ->
ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
%% application/service shutdown
shutdown(Reason, #state{watchdogT = WatchdogT})
when Reason == application;
Reason == service ->
diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
[],
WatchdogT)).
%% st/2
st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
lists:member(Ref, Refs)
andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
%% st/3
st(#watchdog{pid = Pid}, Reason, Acc) ->
Pid ! {shutdown, self(), Reason},
[Pid | Acc].
%% ---------------------------------------------------------------------------
%% # call_service/2
%% ---------------------------------------------------------------------------
call_service(Pid, Req)
when is_pid(Pid) ->
cs(Pid, Req);
call_service(SvcName, Req) ->
call_service_by_name(SvcName, Req).
call_service_by_name(SvcName, Req) ->
cs(whois(SvcName), Req).
cs(Pid, Req)
when is_pid(Pid) ->
try
gen_server:call(Pid, Req, infinity)
catch
E: Reason when E == exit ->
{error, {E, Reason}}
end;
cs(undefined, _) ->
{error, no_service}.
%% ---------------------------------------------------------------------------
%% # i/1
%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
i(SvcName) ->
%% Split the config into a server state and a list of transports.
{#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
{false, []},
diameter_config:lookup(SvcName)),
%% Publish the state in order to be able to access it outside of
%% the service process. Originally table identifiers were only
%% known to the service process but we now want to provide the
%% option of application callbacks being 'stateless' in order to
%% avoid having to go through a common process. (Eg. An agent that
%% sends a request for every incoming request.)
true = ets:insert_new(?STATE_TABLE, S),
%% Start fsms for each transport.
send_event(SvcName, start),
lists:foreach(fun(T) -> start_fsm(T,S) end, CL),
init_shared(S),
S.
cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
{false, Acc}) ->
lists:foreach(fun init_mod/1, Apps),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
monitor = mref(get_value(monitor, Opts)),
options = service_options(Opts)},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
when Type == connect;
Type == listen ->
{S, [T | Acc]}.
service_options(Opts) ->
[{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
{share_peers, get_value(share_peers, Opts)},
{use_shared_peers, get_value(use_shared_peers, Opts)},
{restrict_connections, proplists:get_value(restrict_connections,
Opts,
?RESTRICT)}].
%% The order of options is significant since we match against the list.
mref(false = No) ->
No;
mref(P) ->
erlang:monitor(process, P).
init_shared(#state{options = [_, _, {_, true} | _],
service_name = Svc}) ->
diameter_peer:notify(Svc, {service, self()});
init_shared(#state{options = [_, _, {_, false} | _]}) ->
ok.
init_mod(#diameter_app{alias = Alias,
init_state = S}) ->
mod_state(Alias, S).
start_fsm({Ref, Type, Opts}, S) ->
start(Ref, {Type, Opts}, S).
get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
%% since they involve the same call to merge_service/2. We merge here
%% rather than earlier since the service may not yet be configured
%% when the transport is configured.
start(Ref, {T, Opts}, S)
when T == connect;
T == listen ->
try
{ok, start(Ref, type(T), Opts, S)}
catch
?FAILURE(Reason) ->
{error, Reason}
end.
%% TODO: don't actually raise any errors yet
%% There used to be a difference here between the handling of
%% configured listening and connecting transports but now we simply
%% tell the transport_module to start an accepting or connecting
%% process respectively, the transport implementation initiating
%% listening on a port as required.
type(listen) -> accept;
type(accept) -> listen;
type(connect = T) -> T.
%% start/4
start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
peerT = PeerT,
options = SvcOpts,
service_name = SvcName,
service = Svc0})
when Type == connect;
Type == accept ->
#diameter_service{applications = Apps}
= Svc
= merge_service(Opts, Svc0),
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName,
PeerT,
Apps,
Mask]),
Opts,
SvcOpts,
Svc}),
insert(WatchdogT, #watchdog{pid = Pid,
type = Type,
ref = Ref,
options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
%% record so that each watchdog may get a different record. This
%% record is what is passed back into application callbacks.
s(Type, Ref, T) ->
{_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
Pid.
%% merge_service/2
merge_service(Opts, Svc) ->
lists:foldl(fun ms/2, Svc, Opts).
%% Limit the applications known to the fsm to those in the 'apps'
%% option. That this might be empty is checked by the fsm. It's not
%% checked at config-time since there's no requirement that the
%% service be configured first. (Which could be considered a bit odd.)
ms({applications, As}, #diameter_service{applications = Apps} = S)
when is_list(As) ->
S#diameter_service{applications
= [A || A <- Apps,
lists:member(A#diameter_app.alias, As)]};
%% The fact that all capabilities can be configured on the transports
%% means that the service doesn't necessarily represent a single
%% locally implemented Diameter peer as identified by Origin-Host: a
%% transport can configure its own Origin-Host. This means that the
%% service little more than a placeholder for default capabilities
%% plus a list of applications that individual transports can choose
%% to support (or not).
ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
when is_list(Opts) ->
%% make_caps has already succeeded in diameter_config so it will succeed
%% again here.
{ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
Svc#diameter_service{capabilities = Caps};
ms(_, Svc) ->
Svc.
%% ---------------------------------------------------------------------------
%% # accepted/3
%% ---------------------------------------------------------------------------
accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
#watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
= Wd
= fetch(WatchdogT, Pid),
insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
start(Ref, T, Opts, S). %% start new watchdog
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
T.
%% ---------------------------------------------------------------------------
%% # watchdog/6
%%
%% React to a watchdog state transition.
%% ---------------------------------------------------------------------------
%% Watchdog has a new open connection.
watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
connection_up({TPid, T}, Wd, State);
%% Watchdog has a new connection that will be opened after DW[RA]
%% exchange.
watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
reopen({TPid, T}, Wd, State);
%% Watchdog has recovered a suspect connection.
watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
#watchdog{peer = TPid} = Wd, %% assert
connection_up(Wd, State);
%% Watchdog has an unresponsive connection.
watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
#watchdog{peer = TPid} = Wd, %% assert
connection_down(Wd, To, State);
%% Watchdog has lost its connection.
watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
close(Wd, S),
connection_down(Wd, To, S),
ets:delete(PeerT, TPid);
watchdog(_, [], _, _, _, _) ->
ok.
%% ---------------------------------------------------------------------------
%% # connection_up/3
%% ---------------------------------------------------------------------------
%% Watchdog process has reached state OKAY.
connection_up({TPid, {Caps, SupportedApps, Pkt}},
#watchdog{pid = Pid}
= Wd,
#state{peerT = PeerT}
= S) ->
Pr = #peer{pid = TPid,
apps = SupportedApps,
caps = Caps,
watchdog = Pid},
insert(PeerT, Pr),
connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
%% ---------------------------------------------------------------------------
%% # reopen/3
%% ---------------------------------------------------------------------------
reopen({TPid, {Caps, SupportedApps, _Pkt}},
#watchdog{pid = Pid}
= Wd,
#state{watchdogT = WatchdogT,
peerT = PeerT}) ->
insert(PeerT, #peer{pid = TPid,
apps = SupportedApps,
caps = Caps,
watchdog = Pid}),
insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
peer = TPid}).
%% ---------------------------------------------------------------------------
%% # connection_up/2
%% ---------------------------------------------------------------------------
%% Watchdog has recovered as suspect connection. Note that there has
%% been no new capabilties exchange in this case.
connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
connection_up([], Wd, fetch(PeerT, TPid), S).
%% connection_up/4
connection_up(Extra,
#watchdog{peer = TPid}
= Wd,
#peer{apps = SApps, caps = Caps}
= Pr,
#state{watchdogT = WatchdogT,
local_peers = LDict,
service_name = SvcName,
service = #diameter_service{applications = Apps}}
= S) ->
insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
diameter_traffic:peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
report_status(up, Wd, Pr, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
ilp({Id, Alias}, {TC, SA}, LDict) ->
init_conn(Id, Alias, TC, SA),
?Dict:append(Alias, TC, LDict).
init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
#diameter_app{id = Id} %% assert
= App
= find_app(Alias, Apps),
peer_cb(App, peer_up, [SvcName, TC])
orelse exit(TPid, kill). %% fake transport failure
%% ---------------------------------------------------------------------------
%% # find_incoming_app/3
%% ---------------------------------------------------------------------------
%% No one should be sending the relay identifier.
find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
Caps;
find_incoming_app(Peer, Id, Apps)
when is_integer(Id) ->
find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);
%% Note that the apps represented in SApps may be a strict subset of
%% those in Apps.
find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
case keyfind(Ids, 1, SApps) of
{_Id, Alias} ->
{#diameter_app{} = find_app(Alias, Apps), Caps};
false ->
Caps
end.
%% keyfind/3
keyfind([], _, _) ->
false;
keyfind([Key | Rest], Pos, L) ->
case lists:keyfind(Key, Pos, L) of
false ->
keyfind(Rest, Pos, L);
T ->
T
end.
%% find_outgoing_app/2
find_outgoing_app(Alias, Apps) ->
case find_app(Alias, Apps) of
#diameter_app{id = ?APP_ID_RELAY} ->
false;
A ->
A
end.
%% find_app/2
find_app(Alias, Apps) ->
lists:keyfind(Alias, #diameter_app.alias, Apps).
%% Don't bring down the service (and all associated connections)
%% regardless of what happens.
peer_cb(App, F, A) ->
try state_cb(App, F, A) of
ModS ->
mod_state(App#diameter_app.alias, ModS),
true
catch
E:R ->
diameter_lib:error_report({failure, {E, R, ?STACK}},
{App, F, A}),
false
end.
%% ---------------------------------------------------------------------------
%% # connection_down/3
%% ---------------------------------------------------------------------------
connection_down(#watchdog{state = ?WD_OKAY,
peer = TPid}
= Wd,
#peer{caps = Caps,
apps = SApps}
= Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
diameter_traffic:peer_down(TPid);
connection_down(#watchdog{}, #peer{}, _) ->
ok;
connection_down(#watchdog{state = ?WD_OKAY,
peer = TPid}
= Wd,
To,
#state{watchdogT = WatchdogT,
peerT = PeerT}
= S)
when is_atom(To) ->
insert(WatchdogT, Wd#watchdog{state = To}),
connection_down(Wd, fetch(PeerT, TPid), S);
connection_down(#watchdog{},
To,
#state{})
when is_atom(To) ->
ok.
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
rlp({Id, Alias}, {TC, SA}, LDict) ->
L = ?Dict:fetch(Alias, LDict),
down_conn(Id, Alias, TC, SA),
?Dict:store(Alias, lists:delete(TC, L), LDict).
down_conn(Id, Alias, TC, {SvcName, Apps}) ->
#diameter_app{id = Id} %% assert
= App
= find_app(Alias, Apps),
peer_cb(App, peer_down, [SvcName, TC]).
%% ---------------------------------------------------------------------------
%% # watchdog_down/2
%% ---------------------------------------------------------------------------
%% Watchdog process has died.
watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
Wd = fetch(WatchdogT, Pid),
ets:delete_object(WatchdogT, Wd),
restart(Wd,S),
wd_down(Wd,S).
%% Watchdog has never reached OKAY ...
wd_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
ok;
%% ... or maybe it has.
wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
connection_down(Wd, ?WD_DOWN, S),
ets:delete(PeerT, TPid).
%% restart/2
restart(Wd, S) ->
q_restart(restart(Wd), S).
%% restart/1
%% Always try to reconnect.
restart(#watchdog{ref = Ref,
type = connect = T,
options = Opts,
started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
restart(#watchdog{ref = Ref,
type = accept = T,
options = Opts,
peer = false,
started = Time}) ->
{Time, {Ref, T, Opts}};
%% ... or it has: a replacement has already been spawned.
restart(#watchdog{type = accept}) ->
false.
%% q_restart/2
%% Start the reconnect timer.
q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
start_tc(tc(Time, default_tc(Type, Opts)), T, S);
q_restart(false, _) ->
ok.
%% RFC 3588, 2.1:
%%
%% When no transport connection exists with a peer, an attempt to
%% connect SHOULD be periodically made. This behavior is handled via
%% the Tc timer, whose recommended value is 30 seconds. There are
%% certain exceptions to this rule, such as when a peer has terminated
%% the transport connection stating that it does not wish to
%% communicate.
default_tc(connect, Opts) ->
proplists:get_value(reconnect_timer, Opts, ?DEFAULT_TC);
default_tc(accept, _) ->
0.
%% Bound tc below if the watchdog was restarted recently to avoid
%% continuous restarted in case of faulty config or other problems.
tc(Time, Tc) ->
choose(Tc > ?RESTART_TC
orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
Tc,
?RESTART_TC).
start_tc(0, T, S) ->
tc_timeout(T, S);
start_tc(Tc, T, _) ->
erlang:send_after(Tc, self(), {tc_timeout, T}).
%% tc_timeout/2
tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
tc(diameter_config:have_transport(SvcName, Ref), T, S).
tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
= S) ->
send_event(SvcName, {reconnect, Ref, Opts}),
start(Ref, Type, Opts, S);
tc(false = No, _, _) -> %% removed
No.
%% ---------------------------------------------------------------------------
%% # close/2
%% ---------------------------------------------------------------------------
%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
%% another watchdog to be able to detect that it should transition
%% from initial into reopen rather than okay. That someone is either
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
close(#watchdog{type = connect}, _) ->
ok;
close(#watchdog{type = accept,
pid = Pid,
ref = Ref,
options = Opts},
#state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
c(Pid, true, Opts) ->
Tc = proplists:get_value(reconnect_timer, Opts, 2*?DEFAULT_TC),
erlang:send_after(Tc, Pid, close);
%% ... or now.
c(Pid, false, _Opts) ->
Pid ! close.
%% The RFC's only document the behaviour of Tc, our reconnect_timer,
%% for the establishment of connections but we also give
%% reconnect_timer semantics for a listener, being the time within
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
%% ---------------------------------------------------------------------------
%% # reconnect/2
%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
watchdogT = WatchdogT}) ->
#watchdog{ref = Ref,
type = connect,
options = Opts}
= fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
%% ---------------------------------------------------------------------------
%% # call_module/4
%% ---------------------------------------------------------------------------
%% Backwards compatibility and never documented/advertised. May be
%% removed.
call_module(Mod, Req, From, #state{service
= #diameter_service{applications = Apps},
service_name = Svc}
= S) ->
case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
Req,
From,
Svc)
of
{reply = T, RC} ->
{T, RC, S};
noreply = T ->
{T, S};
Reason ->
{reply, {error, Reason}, S}
end.
cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
Args = [Req, From, Svc],
try state_cb(App, handle_call, Args) of
{noreply = T, ModS} ->
mod_state(Alias, ModS),
T;
{reply = T, RC, ModS} ->
mod_state(Alias, ModS),
{T, RC};
T ->
diameter_lib:error_report({invalid, T},
{App, handle_call, Args}),
invalid
catch
E: Reason ->
diameter_lib:error_report({failure, {E, Reason, ?STACK}},
{App, handle_call, Args}),
failure
end;
cm([], _, _, _) ->
unknown;
cm([_,_|_], _, _, _) ->
multiple.
%% ---------------------------------------------------------------------------
%% # report_status/5
%% ---------------------------------------------------------------------------
report_status(Status,
#watchdog{ref = Ref,
peer = TPid,
type = Type,
options = Opts},
#peer{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
Extra) ->
share_peer(Status, Caps, As, TPid, S),
Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
send_event(SvcName, list_to_tuple(Info)).
%% send_event/2
send_event(SvcName, Info) ->
send_event(#diameter_event{service = SvcName,
info = Info}).
send_event(#diameter_event{service = SvcName} = E) ->
lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
%% ---------------------------------------------------------------------------
%% # share_peer/5
%% ---------------------------------------------------------------------------
share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
service_name = Svc}) ->
diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
share_peer(_, _, _, _, _) ->
ok.
%% ---------------------------------------------------------------------------
%% # share_peers/2
%% ---------------------------------------------------------------------------
share_peers(Pid, #state{options = [_, {_, true} | _],
local_peers = PDict}) ->
?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
share_peers(_, _) ->
ok.
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
%% ---------------------------------------------------------------------------
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
service = Svc,
shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
Key = #diameter_app.alias,
As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
rpu(Pid, Caps, PDict, As);
remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
ok.
rpu(_, _, PDict, []) ->
PDict;
rpu(Pid, Caps, PDict, Aliases) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
%% ---------------------------------------------------------------------------
%% # remote_peer_down/2
%% ---------------------------------------------------------------------------
remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
shared_peers = PDict}) ->
lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
%% ---------------------------------------------------------------------------
%% pick_peer/4
%% ---------------------------------------------------------------------------
pick_peer(#diameter_app{alias = Alias}
= App,
RealmAndHost,
Filter,
#state{local_peers = L,
shared_peers = S,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
pick_peer(peers(Alias, RealmAndHost, Filter, L),
peers(Alias, RealmAndHost, Filter, S),
Pid,
SvcName,
App).
%% pick_peer/5
pick_peer([], [], _, _, _) ->
false;
%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
when self() /= Pid ->
case call_service(Pid, {pick_peer, Local, Remote, App}) of
{TPid, _} = T when is_pid(TPid) ->
T;
{error, _} ->
false
end;
%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
pick_peer(Local,
Remote,
_Pid,
SvcName,
#diameter_app{alias = Alias,
init_state = S,
mutable = M}
= App) ->
Args = [Local, Remote, SvcName],
try state_cb(App, pick_peer, Args) of
{ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
T;
{{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
mod_state(Alias, ModS),
T;
{false = No, ModS} when M ->
mod_state(Alias, ModS),
No;
{ok, false = No} ->
No;
false = No ->
No;
{{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
T; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
T ->
diameter_lib:error_report({invalid, T, App},
{App, pick_peer, Args})
catch
E: Reason ->
diameter_lib:error_report({failure, {E, Reason, ?STACK}},
{App, pick_peer, Args})
end.
%% peers/4
peers(Alias, RH, Filter, Peers) ->
case ?Dict:find(Alias, Peers) of
{ok, L} ->
ps(L, RH, Filter, {[],[]});
error ->
[]
end.
%% Place a peer whose Destination-Host/Realm matches those of the
%% request at the front of the result list. Could add some sort of
%% 'sort' option to allow more control.
ps([], _, _, {Ys, Ns}) ->
lists:reverse(Ys, Ns);
ps([{_TPid, #diameter_caps{} = Caps} = TC | Rest], RH, Filter, Acc) ->
ps(Rest, RH, Filter, pacc(caps_filter(Caps, RH, Filter),
caps_filter(Caps, RH, {all, [host, realm]}),
TC,
Acc)).
pacc(true, true, Peer, {Ts, Fs}) ->
{[Peer|Ts], Fs};
pacc(true, false, Peer, {Ts, Fs}) ->
{Ts, [Peer|Fs]};
pacc(_, _, _, Acc) ->
Acc.
%% caps_filter/3
caps_filter(C, RH, {neg, F}) ->
not caps_filter(C, RH, F);
caps_filter(C, RH, {all, L})
when is_list(L) ->
lists:all(fun(F) -> caps_filter(C, RH, F) end, L);
caps_filter(C, RH, {any, L})
when is_list(L) ->
lists:any(fun(F) -> caps_filter(C, RH, F) end, L);
caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
eq(undefined, DH, OH);
caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
eq(undefined, DR, OR);
caps_filter(C, _, Filter) ->
caps_filter(C, Filter).
%% caps_filter/2
caps_filter(_, none) ->
true;
caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
eq(any, H, OH);
caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
eq(any, R, OR);
%% Anything else is expected to be an eval filter. Filter failure is
%% documented as being equivalent to a non-matching filter.
caps_filter(C, T) ->
try
{eval, F} = T,
diameter_lib:eval([F,C])
catch
_:_ -> false
end.
eq(Any, Id, PeerId) ->
Any == Id orelse try
iolist_to_binary(Id) == iolist_to_binary(PeerId)
catch
_:_ -> false
end.
%% OctetString() can be specified as an iolist() so test for string
%% rather then term equality.
%% transports/1
transports(#state{watchdogT = WatchdogT}) ->
ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
%% ---------------------------------------------------------------------------
%% # service_info/2
%% ---------------------------------------------------------------------------
%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
'Origin-Realm',
'Vendor-Id',
'Product-Name',
'Origin-State-Id',
'Host-IP-Address',
'Supported-Vendor-Id',
'Auth-Application-Id',
'Inband-Security-Id',
'Acct-Application-Id',
'Vendor-Specific-Application-Id',
'Firmware-Revision']).
%% The config returned by diameter:service_info(SvcName, all).
-define(ALL_INFO, [capabilities,
applications,
transport,
pending,
options]).
%% The rest.
-define(OTHER_INFO, [connections,
name,
peers,
statistics]).
service_info(Item, S)
when is_atom(Item) ->
case tagged_info(Item, S) of
{_, T} -> T;
undefined = No -> No
end;
service_info(Items, S) ->
tagged_info(Items, S).
tagged_info(Item, S)
when is_atom(Item) ->
case complete(Item) of
{value, I} ->
{I, complete_info(I,S)};
false ->
undefined
end;
tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
when is_pid(TPid) ->
try
[#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
[#watchdog{ref = Ref, type = Type, options = Opts}]
= ets:lookup(WatchdogT, Pid),
[{ref, Ref},
{type, Type},
{options, Opts}]
catch
error:_ ->
[]
end;
tagged_info(Items, S)
when is_list(Items) ->
[T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];
tagged_info(_, _) ->
undefined.
complete_info(Item, #state{service = Svc} = S) ->
case Item of
name ->
S#state.service_name;
'Origin-Host' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_host;
'Origin-Realm' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_realm;
'Vendor-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.vendor_id;
'Product-Name' ->
(Svc#diameter_service.capabilities)
#diameter_caps.product_name;
'Origin-State-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.origin_state_id;
'Host-IP-Address' ->
(Svc#diameter_service.capabilities)
#diameter_caps.host_ip_address;
'Supported-Vendor-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.supported_vendor_id;
'Auth-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.auth_application_id;
'Inband-Security-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.inband_security_id;
'Acct-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.acct_application_id;
'Vendor-Specific-Application-Id' ->
(Svc#diameter_service.capabilities)
#diameter_caps.vendor_specific_application_id;
'Firmware-Revision' ->
(Svc#diameter_service.capabilities)
#diameter_caps.firmware_revision;
capabilities -> service_info(?CAP_INFO, S);
applications -> info_apps(S);
transport -> info_transport(S);
options -> info_options(S);
pending -> info_pending(S);
keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
statistics -> info_stats(S);
connections -> info_connections(S);
peers -> info_peers(S)
end.
complete(I)
when I == keys;
I == all ->
{value, I};
complete(Pre) ->
P = atom_to_list(Pre),
case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO,
lists:prefix(P, atom_to_list(I))]
of
[I] -> {value, I};
_ -> false
end.
%% info_stats/1
info_stats(#state{watchdogT = WatchdogT}) ->
MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}],
try ets:select(WatchdogT, MatchSpec) of
L ->
diameter_stats:read(lists:append(L))
catch
error: badarg -> [] %% service has gone down
end.
%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
%% the accumulated values for the ref and associated watchdog/peer
%% pids.
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
RefsD = dict:map(fun(_, Ls) -> [P || L <- Ls, {peer, {P,_}} <- L] end,
PeerD),
Refs = lists:append(dict:fold(fun(R, Ps, A) -> [[R|Ps] | A] end,
[],
RefsD)),
Stats = diameter_stats:read(Refs),
dict:fold(fun(R, Ls, A) ->
Ps = dict:fetch(R, RefsD),
[[{ref, R} | transport(Ls)] ++ [stats([R|Ps], Stats)]
| A]
end,
[],
PeerD).
%% Only a config entry for a listening transport: use it.
transport([[{type, listen}, _] = L]) ->
L ++ [{accept, []}];
%% Only one config or peer entry for a connecting transport: use it.
transport([[{type, connect} | _] = L]) ->
L;
%% Peer entries: discard config. Note that the peer entries have
%% length at least 3.
transport([[_,_] | L]) ->
transport(L);
%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
[{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,L) || L <- Ls]}].
peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
try ets:tab2list(WatchdogT) of
L ->
lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
peer_acc(PeerT, Acc, #watchdog{pid = Pid,
type = Type,
ref = Ref,
options = Opts,
state = WS,
started = At,
peer = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
{watchdog, {Pid, At, WS}}
| info_peer(PeerT, TPid, WS)],
Acc).
info_peer(PeerT, TPid, WS)
when is_pid(TPid), WS /= ?WD_DOWN ->
try ets:lookup(PeerT, TPid) of
T -> info_peer(T)
catch
error: badarg -> [] %% service has gone down
end;
info_peer(_, _, _) ->
[].
%% The point of extracting the config here is so that 'transport' info
%% has one entry for each transport ref, the peer table only
%% containing entries that have a living watchdog.
config_dict(#state{service_name = SvcName}) ->
lists:foldl(fun config_acc/2,
dict:new(),
diameter_config:lookup(SvcName)).
config_acc({Ref, T, Opts}, Dict)
when T == listen;
T == connect ->
dict:store(Ref, [[{type, T}, {options, Opts}]], Dict);
config_acc(_, Dict) ->
Dict.
info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}
| try [{port, info_port(Pid)}] catch _:_ -> [] end];
info_peer([] = No) ->
No.
%% Extract information that the processes involved are expected to
%% "publish" in their process dictionaries. Simple but backhanded.
info_port(Pid) ->
{_, PD} = process_info(Pid, dictionary),
{_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
{TPid, {_Type, TMod, _Cfg}} = T,
{_, TD} = process_info(TPid, dictionary),
{_, Data} = lists:keyfind({TMod, info}, 1, TD),
[{owner, TPid},
{module, TMod}
| try TMod:info(Data) catch _:_ -> [] end].
%% Use the fields names from diameter_caps instead of
%% diameter_base_CER to distinguish between the 2-tuple values
%% compared to the single capabilities values. Note also that the
%% returned list is tagged 'caps' rather than 'capabilities' to
%% emphasize the difference.
info_caps(#diameter_caps{} = C) ->
lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
info_apps(#state{service = #diameter_service{applications = Apps}}) ->
lists:map(fun mk_app/1, Apps).
mk_app(#diameter_app{} = A) ->
lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))).
%% info_pending/1
%%
%% One entry for each outgoing request whose answer is outstanding.
info_pending(#state{} = S) ->
diameter_traffic:pending(transports(S)).
%% info_connections/1
%%
%% One entry per transport connection. Statistics for each entry are
%% for the peer pid only.
info_connections(S) ->
ConnL = conn_list(S),
Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]),
[L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L].
conn_list(S) ->
lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S, dict:new()))).
conn_acc(Ref, Peers, Acc) ->
[[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)]
| Acc].
stats(Refs, Stats) ->
{statistics, dict:to_list(lists:foldl(fun(R,D) ->
stats_acc(R, D, Stats)
end,
dict:new(),
Refs))}.
stats_acc(Ref, Dict, Stats) ->
lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end,
Dict,
proplists:get_value(Ref, Stats, [])).
%% info_peers/1
%%
%% One entry per peer Origin-Host. Statistics for each entry are
%% accumulated values for all peer pids.
info_peers(S) ->
{PeerD, RefD} = lists:foldl(fun peer_acc/2,
{dict:new(), dict:new()},
conn_list(S)),
Refs = lists:append(dict:fold(fun(_, Rs, A) -> [Rs|A] end,
[],
RefD)),
Stats = diameter_stats:read(Refs),
dict:fold(fun(OH, Cs, A) ->
Rs = dict:fetch(OH, RefD),
[{OH, [{connections, Cs}, stats(Rs, Stats)]} | A]
end,
[],
PeerD).
peer_acc(Peer, {PeerD, RefD}) ->
[{TPid, _}, [{origin_host, {_, OH}} | _]]
= [proplists:get_value(K, Peer) || K <- [peer, caps]],
{dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}.
%% info_options/1
info_options(S) ->
S#state.options.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: smime.p7s
Type: application/pkcs7-signature
Size: 2182 bytes
Desc: Kryptograficzna sygnatura S/MIME
URL: <http://erlang.org/pipermail/erlang-bugs/attachments/20130520/6819412f/attachment.bin>
More information about the erlang-bugs
mailing list