Distribution by another means

Sean Hinde Sean.Hinde@REDACTED
Wed Jun 14 16:08:34 CEST 2000


All,

The security didn't prove too hard so here is a new version. Based on the
RADIUS mechanism, an md5 hash of a shared secret concatenated with the Timer
reference is sent with each command. It is still open to spoofing of replies
but I guess that is not too dangerous.

It now also requires the crypto app to be started

Feel free to make use as you wish, but please share any fixes and
improvements

Sean


%%%----------------------------------------------------------------------
%%% File    : rpc_socket.erl
%%% Author  : ottuser local account <otpuser@REDACTED>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <otpuser@REDACTED>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn('$Revision: 1.2 $ ').
-author('shinde@REDACTED').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/3, get_connection/1, worker/5]).

-behaviour(gen_server).


% Internal state for this socket process
-record(state,{listen_pid,		% Pid of Listener
	       lis_socket,		% Listener Socket
	       socket = undefined,	% Socket ref
	       secret = null}).		% Shared Secret

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket, Secret) ->
    gen_server:start_link(rpc_socket, {ListenPid, ListenSocket, Secret},[]).

get_connection(Pid) ->
    gen_server:cast(Pid, get_conn).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket, Secret}) ->
    {ok, #state{listen_pid = ListenPid,
		lis_socket = ListenSocket,
		secret = Secret}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
    {reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
    case catch gen_tcp:accept(State#state.lis_socket) of
	{error, closed} ->
	    {stop, {error, accept_failed}, State};
	{error, Reason} ->
	    {stop, {error, accept_failed}, State};
	{'EXIT', Reason} ->
	    {stop, {error, accept_failed}, State};
	{ok, Socket} ->
	    rpc_listen:create(State#state.listen_pid, self()),
	    {noreply, State#state{socket = Socket}}
    end;

handle_cast(_Reply ,State) ->
    {noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
    case catch binary_to_term(Packet) of
	{heartbeat, Ref, Checksum} ->
	    io:format("Heartbeat Received~n"),
	    case check(Ref, State#state.secret, Checksum) of
		true ->
		    io:format("check ok~n"),
		    gen_tcp:send(Socket, term_to_binary({heart_reply,
Ref})),
		    {noreply, State};
		false ->
		    {noreply, State}
	    end;
	{apply, M, F, A, Ref, Checksum} ->
	    case check(Ref, State#state.secret, Checksum) of
		true ->
		    Pid = spawn_link(?MODULE, worker, [M, F, A, Ref,
Socket]),
		    {noreply, State};
		false ->
		    {noreply, State}
	    end;
	Else ->
	    io:format("Socket Received Else: ~p~n",[Else]),
	    {noreply, State}
    end;

handle_info({tcp_closed, Socket}, State) ->
    {stop, rpc_skt_closed, State};

handle_info({tcp_error, Socket, Reason}, State) ->
    gen_tcp:close(State#state.socket),
    {stop, rpc_skt_error, State};


handle_info(Anymessage, State) ->
    {noreply, State}.


%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
    ok;
terminate(Reason, #state{socket = Socket}) ->
    gen_tcp:close(Socket),
    ok.	

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

worker(M, F, A, Ref, Socket) ->
    Reply = (catch apply(M, F, A)),
%    io:format("Reply: ~p~n", [Reply]),
    gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).

	    
	
check(Ref, Secret, Checksum) ->
    Checksum == crypto:md5(concat_binary([term_to_binary(Ref), Secret])).

-----------------------------cut--------------------------------------------
------------------

%%%----------------------------------------------------------------------
%%% File    : rpc_listen.erl
%%% Author  : shinde local account <otpuser@REDACTED>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <otpuser@REDACTED>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn('$Revision: 1.2 $ ').
-author('shinde@REDACTED').

-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).

-behaviour(gen_server).

-record(state,{listen_socket = [],		% Listener socket reference
	       secret = null}).			% Shared Secret
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port, Secret) ->
    Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
    gen_server:start_link({local, Name}, ?MODULE, {Port, Secret}, []).


%% Access to this server from socket servers
create(ServerPid, Pid) ->
    gen_server:cast(ServerPid, {create, Pid}).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init({Port, Secret}) ->
    process_flag(trap_exit, true),
    Bin_secret = list_to_binary(Secret),
    case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
			      {reuseaddr,true}]) of
	{ok, ListenSocket} ->
	    {ok, Pid} = rpc_socket:start(self(), ListenSocket, Bin_secret),
% Start acceptor process
	    rpc_socket:get_connection(Pid),	% Tell it to start accepting
	    {ok, #state{listen_socket = ListenSocket,
			secret = Bin_secret}};
	{error, Reason} ->
	    {stop, Reason}
    end.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
    {reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
    {ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket,
State#state.secret),
    rpc_socket:get_connection(NewPid),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
% 
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
    create(self(), self()),			% Start off new acceptor as
listen socket is still open
    {noreply,State};

% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
    {noreply,State};

handle_info(Info, State) ->
    io:format("Unexpected info: ~p~n",[Info]),
    {noreply,State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
    gen_tcp:close(State#state.listen_socket),
    ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

--------------------------------cut--------------------------

%%%----------------------------------------------------------------------
%%% File    : rpc_client.erl
%%% Author  : shinde@REDACTED local account <otpuser@REDACTED>
%%% Purpose : 
%%% Created : 11 Apr 2000 by shinde local account <otpuser@REDACTED>
%%%----------------------------------------------------------------------

-module(rpc_client).
-author('shinde@REDACTED').
-vsn('$Revision: 1.2 $ ').

-behaviour(gen_fsm).

%% External exports
-export([start_link/4]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
	 handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).

-export([call/5]).

-define(HEART_TIMEOUT, 8000).			% Heartbeat response timeout
-define(HEART_PERIOD, 15000).			% Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000).			% Command response timeout
-define(HEART_FAIL_THRESHOLD, 3).		% Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000).			% Period to wait before
retrying the connection

-record(state, {heart_fails = 0,		% counter of heartbeat
failures
		heart_ref = none,		% Timer Ref of repeating
heartbeat timer
		heart_timeout_ref = none,	% Timer Ref for heartbeat
send timeout
		cmds = 0,			% ets table containing
currently outstanding commands
		name = "",			% Given name for this
connection
		socket = 0,			% connected socket
		hostname = "",			% hostname to maintain
client connection to
		port = 0,			% listening port on above
host
		secret = null}).		% shared secret for security

%% cmds contains {{cmd, Timer_ref}, Reply_to}

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port, Secret) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port,
Secret}, []).

call(Name, M, F, A, Timeout) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData}          |
%%          {ok, StateName, StateData, Timeout} |
%%          ignore                              |
%%          {stop, StopReason}                   
%%----------------------------------------------------------------------
init({Name, Hostname, Port, Secret}) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    global:re_register_name(Reg_name, self()),
    {ok, connecting, #state{cmds = ets:new(cmds, []),
			    name = Name,
			    hostname = Hostname,
			    secret = list_to_binary(Secret),
			    port = Port}, 0}.	% send timeout immediately

%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
    case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
			 [binary, {active, true}, {packet, 2}], 5000) of
	{ok, Socket} ->
	    Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
	    case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref,
md5(Ref, StateData#state.secret)})) of
		ok  ->
		    cancel_timer(StateData#state.heart_timeout_ref),
		    {next_state, wait_first_heart, StateData#state{socket =
Socket,
	
heart_timeout_ref = Ref,
	
heart_fails = 0}};
		{error, _} ->
		    cancel_timer(Ref),
		    gen_tcp:close(Socket),
		    timer:sleep(5000),		% Wait for a while before
retrying
		    {next_state, connecting, StateData, 0}
	    end;
	{error, _} ->
	    timer:sleep(5000),
	    {next_state, connecting, StateData, 0}
    end;
connecting(_, StateData) ->
    {next_state, connecting, StateData}.

%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}                    
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
    {reply, {error, not_yet_heartbeating}, connecting, StateData}.

connecting({apply, _, _, _, _}, From, StateData) ->
    {reply, {error, not_connected}, connecting, StateData}.

connected({apply, M, F, A, Timeout}, From, StateData) ->
    Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
    case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref, md5(Ref, StateData#state.secret)})) of
	ok ->
	    ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
	    {next_state, connected, StateData};
	{error, Reason} ->
	    cancel_timer(Ref),
	    {reply, {error, Reason}, connected, StateData}
    end.
	    
%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
    {next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}                    
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
    Reply = ok,
    {reply, Reply, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout}, 
	    wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
    gen_tcp:close(StateData#state.socket),
    {next_state, connecting, StateData#state{socket = 0,
					     heart_timeout_ref = none}, 0};

%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
    {next_state, connecting, StateData, 0};

%% Wow, received some data
handle_info({tcp, Socket, Data}, 
	    wait_first_heart, #state{socket = Socket} = StateData) ->
    case catch binary_to_term(Data) of
	{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
	    Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
	    io:format("First Heart_timer_set: ~p~n", [Next_heart]),
	    {next_state, connected, StateData#state{heart_ref =
Next_heart}};
	_ ->
	    gen_tcp:close(Socket),
	    cancel_timer(StateData#state.heart_timeout_ref),
	    {next_state, connecting, StateData, 0}
    end;

%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we 
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
  when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
    gen_tcp:close(StateData#state.socket),
    cancel_timer(StateData#state.heart_ref),
    {next_state, connecting, StateData#state{socket = 0,
					     heart_timeout_ref = none}, 0};

%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
    Heart_fails = StateData#state.heart_fails,
    {next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};

%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
    cancel_timer(StateData#state.heart_timeout_ref), % Just in case
    New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
    Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
%    io:format("Heart_timer_set: ~p~n", [Next_heart]),
    case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref, md5(New_ref, StateData#state.secret)})) of
	ok ->
	    {next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
						    heart_ref =
Next_heart}};
	{error, _} ->
	    cancel_timer(New_ref),
	    Heart_fails = StateData#state.heart_fails,
	    {next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
						    heart_timeout_ref =
null,
						    heart_ref = Next_heart}}
    end;

%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
    Cmds = StateData#state.cmds,
    case ets:lookup(Cmds, {cmd, Ref}) of
	[{{cmd, Ref}, Reply_to}] ->
	    gen_fsm:reply(Reply_to, {error, timed_out}),
	    ets:delete(Cmds, {cmd, Ref}),
	    {next_state, connected, StateData};
	[] ->
	    {next_state, connected, StateData}
    end;

%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
    Cmds = StateData#state.cmds,
    case catch binary_to_term(Data) of
	{reply, Ref, Reply} ->
	    case ets:lookup(Cmds, {cmd, Ref}) of
		[{{cmd, Ref}, Reply_to}] ->
		    gen_fsm:reply(Reply_to, Reply),
		    ets:delete(Cmds, {cmd, Ref}),
		    cancel_timer(Ref),
		    {next_state, connected, StateData};
		[] ->
		    {next_state, connected, StateData}
	    end;
	{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
	    cancel_timer(Ref),
	    {next_state, connected, StateData#state{heart_fails = 0}};
	Else ->
	    io:format("unknown Packet received~p~n", [Else]),
	    {next_state, connected, StateData}
    end;

%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
    {next_state, StateName, StateData};

handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
    cancel_timer(StateData#state.heart_ref),
    cancel_timer(StateData#state.heart_timeout_ref),
    reply_to_all(StateData#state.cmds),
    {next_state, connecting, StateData#state{socket = 0},0};

handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
    io:format("Tcp_Error: ~p~n", [Reason]),
    {next_state, StateName, StateData};

handle_info(Info, StateName, StateData) ->
    io:format("Unexpected Info: ~p~n", [Info]),
    {next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
    ok.

%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
    {ok, StateName, StateData}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

cancel_timer(none) ->
    ok;
cancel_timer(Ref) ->
    erlang:cancel_timer(Ref),
    receive
	{timeout, Ref, _} ->
	    ok
    after 0 ->
	    ok
    end.

reply_to_all(Ets) ->
    ok.

md5(Ref, Secret) ->
    crypto:md5(concat_binary([term_to_binary(Ref), Secret])).





More information about the erlang-questions mailing list