Sean Hinde
Wed Jun 14 16:08:34 CEST 2000
The security didn't prove too hard so here is a new version. Based on the
RADIUS mechanism, an md5 hash of a shared secret concatenated with the Timer
reference is sent with each command. It is still open to spoofing of replies
but I guess that is not too dangerous.
It now also requires the crypto app to be started
Feel free to make use as you wish, but please share any fixes and
%%% File : rpc_socket.erl
%%% Author : ottuser local account <otpuser@REDACTED>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
%%% Created : 26 May 1999 by ottuser local account <otpuser@REDACTED>
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
-vsn('$Revision: 1.2 $ ').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
-export([start/3, get_connection/1, worker/5]).
% Internal state for this socket process
-record(state,{listen_pid, % Pid of Listener
lis_socket, % Listener Socket
socket = undefined, % Socket ref
secret = null}). % Shared Secret
%%% API
start(ListenPid, ListenSocket, Secret) ->
gen_server:start_link(rpc_socket, {ListenPid, ListenSocket, Secret},[]).
get_connection(Pid) ->
gen_server:cast(Pid, get_conn).
%%% Callback functions from gen_server
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
init({ListenPid, ListenSocket, Secret}) ->
{ok, #state{listen_pid = ListenPid,
lis_socket = ListenSocket,
secret = Secret}}.
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
handle_call(Request,From,State) ->
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
handle_cast(get_conn, State) ->
case catch gen_tcp:accept(State#state.lis_socket) of
{error, closed} ->
{stop, {error, accept_failed}, State};
{error, Reason} ->
{stop, {error, accept_failed}, State};
{'EXIT', Reason} ->
{stop, {error, accept_failed}, State};
{ok, Socket} ->
rpc_listen:create(State#state.listen_pid, self()),
{noreply, State#state{socket = Socket}}
handle_cast(_Reply ,State) ->
{noreply, State}.
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
handle_info({tcp, Socket, Packet}, State) ->
case catch binary_to_term(Packet) of
{heartbeat, Ref, Checksum} ->
io:format("Heartbeat Received~n"),
case check(Ref, State#state.secret, Checksum) of
true ->
io:format("check ok~n"),
gen_tcp:send(Socket, term_to_binary({heart_reply,
{noreply, State};
false ->
{noreply, State}
{apply, M, F, A, Ref, Checksum} ->
case check(Ref, State#state.secret, Checksum) of
true ->
Pid = spawn_link(?MODULE, worker, [M, F, A, Ref,
{noreply, State};
false ->
{noreply, State}
Else ->
io:format("Socket Received Else: ~p~n",[Else]),
{noreply, State}
handle_info({tcp_closed, Socket}, State) ->
{stop, rpc_skt_closed, State};
handle_info({tcp_error, Socket, Reason}, State) ->
{stop, rpc_skt_error, State};
handle_info(Anymessage, State) ->
{noreply, State}.
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
terminate(Reason, #state{socket = undefined}) ->
terminate(Reason, #state{socket = Socket}) ->
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%% Internal functions
worker(M, F, A, Ref, Socket) ->
Reply = (catch apply(M, F, A)),
% io:format("Reply: ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).
check(Ref, Secret, Checksum) ->
Checksum == crypto:md5(concat_binary([term_to_binary(Ref), Secret])).
%%% File : rpc_listen.erl
%%% Author : shinde local account <otpuser@REDACTED>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
%%% Created : 26 May 1999 by ottuser local account <otpuser@REDACTED>
-vsn('$Revision: 1.2 $ ').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
-record(state,{listen_socket = [], % Listener socket reference
secret = null}). % Shared Secret
%%% API
start_link(Port, Secret) ->
Name =
gen_server:start_link({local, Name}, ?MODULE, {Port, Secret}, []).
%% Access to this server from socket servers
create(ServerPid, Pid) ->
gen_server:cast(ServerPid, {create, Pid}).
%%% Callback functions from gen_server
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
init({Port, Secret}) ->
process_flag(trap_exit, true),
Bin_secret = list_to_binary(Secret),
case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
{reuseaddr,true}]) of
{ok, ListenSocket} ->
{ok, Pid} = rpc_socket:start(self(), ListenSocket, Bin_secret),
% Start acceptor process
rpc_socket:get_connection(Pid), % Tell it to start accepting
{ok, #state{listen_socket = ListenSocket,
secret = Bin_secret}};
{error, Reason} ->
{stop, Reason}
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
handle_call(Request,From,State) ->
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
handle_cast({create,Pid}, State) ->
{ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket,
{noreply, State}.
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
create(self(), self()), % Start off new acceptor as
listen socket is still open
% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
handle_info(Info, State) ->
io:format("Unexpected info: ~p~n",[Info]),
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
terminate(Reason,State) ->
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%% Internal functions
%%% File : rpc_client.erl
%%% Author : shinde@REDACTED local account <otpuser@REDACTED>
%%% Purpose :
%%% Created : 11 Apr 2000 by shinde local account <otpuser@REDACTED>
-vsn('$Revision: 1.2 $ ').
%% External exports
%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connected/3, connecting/3, wait_first_heart/3]).
-define(HEART_TIMEOUT, 8000). % Heartbeat response timeout
-define(HEART_PERIOD, 15000). % Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000). % Command response timeout
-define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000). % Period to wait before
retrying the connection
-record(state, {heart_fails = 0, % counter of heartbeat
heart_ref = none, % Timer Ref of repeating
heartbeat timer
heart_timeout_ref = none, % Timer Ref for heartbeat
send timeout
cmds = 0, % ets table containing
currently outstanding commands
name = "", % Given name for this
socket = 0, % connected socket
hostname = "", % hostname to maintain
client connection to
port = 0, % listening port on above
secret = null}). % shared secret for security
%% cmds contains {{cmd, Timer_ref}, Reply_to}
%%% API
start_link(Name, Hostname, Port, Secret) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port,
Secret}, []).
call(Name, M, F, A, Timeout) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
%%% Callback functions from gen_fsm
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
init({Name, Hostname, Port, Secret}) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
global:re_register_name(Reg_name, self()),
{ok, connecting, #state{cmds = ets:new(cmds, []),
name = Name,
hostname = Hostname,
secret = list_to_binary(Secret),
port = Port}, 0}. % send timeout immediately
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
[binary, {active, true}, {packet, 2}], 5000) of
{ok, Socket} ->
Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref,
md5(Ref, StateData#state.secret)})) of
ok ->
{next_state, wait_first_heart, StateData#state{socket =
heart_timeout_ref = Ref,
heart_fails = 0}};
{error, _} ->
timer:sleep(5000), % Wait for a while before
{next_state, connecting, StateData, 0}
{error, _} ->
{next_state, connecting, StateData, 0}
connecting(_, StateData) ->
{next_state, connecting, StateData}.
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_yet_heartbeating}, connecting, StateData}.
connecting({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_connected}, connecting, StateData}.
connected({apply, M, F, A, Timeout}, From, StateData) ->
Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref, md5(Ref, StateData#state.secret)})) of
ok ->
ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
{next_state, connected, StateData};
{error, Reason} ->
{reply, {error, Reason}, connected, StateData}
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}.
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% State: wait_first_heart
%% Waiting for first heartbeat. Received timeout which is the current
handle_info({timeout, Ref, heart_timeout},
wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
{next_state, connecting, StateData, 0};
%% Wow, received some data
handle_info({tcp, Socket, Data},
wait_first_heart, #state{socket = Socket} = StateData) ->
case catch binary_to_term(Data) of
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
io:format("First Heart_timer_set: ~p~n", [Next_heart]),
{next_state, connected, StateData#state{heart_ref =
_ ->
{next_state, connecting, StateData, 0}
%% State: connected
%% Received Nth heartbeat timeout in connected phase which means we
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Received non final heartbeat timeout in connected phase, increment
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};
%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
cancel_timer(StateData#state.heart_timeout_ref), % Just in case
New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
% io:format("Heart_timer_set: ~p~n", [Next_heart]),
case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref, md5(New_ref, StateData#state.secret)})) of
ok ->
{next_state, connected, StateData#state{heart_timeout_ref =
heart_ref =
{error, _} ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
heart_timeout_ref =
heart_ref = Next_heart}}
%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
Cmds = StateData#state.cmds,
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, {error, timed_out}),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
Cmds = StateData#state.cmds,
case catch binary_to_term(Data) of
{reply, Ref, Reply} ->
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, Reply),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
{next_state, connected, StateData#state{heart_fails = 0}};
Else ->
io:format("unknown Packet received~p~n", [Else]),
{next_state, connected, StateData}
%% State: all states
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
{next_state, StateName, StateData};
handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
{next_state, connecting, StateData#state{socket = 0},0};
handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
io:format("Tcp_Error: ~p~n", [Reason]),
{next_state, StateName, StateData};
handle_info(Info, StateName, StateData) ->
io:format("Unexpected Info: ~p~n", [Info]),
{next_state, StateName, StateData}.
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
terminate(Reason, StateName, StatData) ->
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.
%%% Internal functions
cancel_timer(none) ->
cancel_timer(Ref) ->
{timeout, Ref, _} ->
after 0 ->
reply_to_all(Ets) ->
md5(Ref, Secret) ->
crypto:md5(concat_binary([term_to_binary(Ref), Secret])).
