Distribution by another means

Sean Hinde <>
Wed Jun 14 14:13:32 CEST 2000


OK, here it is..

A non blocking persistent client with heartbeats for socket maintenance.
Implemented using a gen_fsm (rpc_client)

A multithreaded tcp/ip server which allows for multiple connections.
Implemented using two gen_servers (rpc_listen.erl, rpc_socket.erl)

E.g Usage:

on the server node (note non distributed so no cheating!)

10>rpc_listen:start_link(3456).
{ok, <0.335.0>}

On the client node:

3>rpc_client:start_link("sean", "localhost", 3456).
{ok, <0.123.0>}
4>rpc_client:call("sean", io, format, ["How cool is Erlang?~n"], 3000).
ok


Please tear my code apart at will. I'm never very sure of the best way to
deal with socket errors on send, receive, and particularly in accept so this
needs some work (suggestions?)

Also it needs some security and more testing.

Sean

 <<rpc_client.erl>>  <<rpc_listen.erl>>  <<rpc_socket.erl>> 

============================================================

%%%----------------------------------------------------------------------
%%% File    : rpc_client.erl
%%% Author  :  local account <>
%%% Purpose : 
%%% Created : 11 Apr 2000 by shinde local account <>
%%%----------------------------------------------------------------------

-module(rpc_client).
-author('').
-vsn(1).

-behaviour(gen_fsm).

%% External exports
-export([start_link/3]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
	 handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).

-export([call/5]).

-define(HEART_TIMEOUT, 8000).			% Heartbeat response timeout
-define(HEART_PERIOD, 15000).			% Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000).			% Command response timeout
-define(HEART_FAIL_THRESHOLD, 3).		% Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000).			% Period to wait before
retrying the connection

-record(state, {heart_fails = 0,		% counter of heartbeat
failures
		heart_ref = none,		% Timer Ref of repeating
heartbeat timer
		heart_timeout_ref = none,	% Timer Ref for heartbeat
send timeout
		cmds = 0,			% ets table containing
currently outstanding commands
		name = "",			% Given name for this
connection
		socket = 0,			% connected socket
		hostname = "",			% hostname to maintain
client connection to
		port = 0}).			% listening port on above
host

%% cmds contains {{cmd, Timer_ref}, Reply_to}

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port},
[]).

call(Name, M, F, A, Timeout) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData}          |
%%          {ok, StateName, StateData, Timeout} |
%%          ignore                              |
%%          {stop, StopReason}                   
%%----------------------------------------------------------------------
init({Name, Hostname, Port}) ->
    Reg_name = list_to_atom("rpc_client_" ++ Name),
    global:re_register_name(Reg_name, self()),
    {ok, connecting, #state{cmds = ets:new(cmds, []),
			    name = Name,
			    hostname = Hostname,
			    port = Port}, 0}.	% send timeout immediately

%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
    case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
			 [binary, {active, true}, {packet, 2}], 5000) of
	{ok, Socket} ->
	    Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
	    case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref})) of
		ok  ->
		    cancel_timer(StateData#state.heart_timeout_ref),
		    {next_state, wait_first_heart, StateData#state{socket =
Socket,
	
heart_timeout_ref = Ref,
	
heart_fails = 0}};
		{error, _} ->
		    cancel_timer(Ref),
		    gen_tcp:close(Socket),
		    timer:sleep(5000),		% Wait for a while before
retrying
		    {next_state, connecting, StateData, 0}
	    end;
	{error, _} ->
	    timer:sleep(5000),
	    {next_state, connecting, StateData, 0}
    end;
connecting(_, StateData) ->
    {next_state, connecting, StateData}.

%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}                    
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
    {reply, {error, not_yet_heartbeating}, connecting, StateData}.

connecting({apply, _, _, _, _}, From, StateData) ->
    {reply, {error, not_connected}, connecting, StateData}.

connected({apply, M, F, A, Timeout}, From, StateData) ->
    Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
    case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref})) of
	ok ->
	    ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
	    {next_state, connected, StateData};
	{error, Reason} ->
	    cancel_timer(Ref),
	    {reply, {error, Reason}, connected, StateData}
    end.
	    
%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
    {next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}                    
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
    Reply = ok,
    {reply, Reply, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                         
%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout}, 
	    wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
    gen_tcp:close(StateData#state.socket),
    {next_state, connecting, StateData#state{socket = 0,
					     heart_timeout_ref = none}, 0};

%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
    {next_state, connecting, StateData, 0};

%% Wow, received some data
handle_info({tcp, Socket, Data}, 
	    wait_first_heart, #state{socket = Socket} = StateData) ->
    case catch binary_to_term(Data) of
	{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
	    Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
%	    io:format("First Heart_timer_set: ~p~n", [Next_heart]),
	    {next_state, connected, StateData#state{heart_ref =
Next_heart}};
	_ ->
	    gen_tcp:close(Socket),
	    cancel_timer(StateData#state.heart_timeout_ref),
	    {next_state, connecting, StateData, 0}
    end;

%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we 
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
  when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
    gen_tcp:close(StateData#state.socket),
    cancel_timer(StateData#state.heart_ref),
    {next_state, connecting, StateData#state{socket = 0,
					     heart_timeout_ref = none}, 0};

%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
    Heart_fails = StateData#state.heart_fails,
    {next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};

%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
    cancel_timer(StateData#state.heart_timeout_ref), % Just in case
    New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
    Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
%    io:format("Heart_timer_set: ~p~n", [Next_heart]),
    case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref})) of
	ok ->
	    {next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
						    heart_ref =
Next_heart}};
	{error, _} ->
	    cancel_timer(New_ref),
	    Heart_fails = StateData#state.heart_fails,
	    {next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
						    heart_timeout_ref =
null,
						    heart_ref = Next_heart}}
    end;

%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
    Cmds = StateData#state.cmds,
    case ets:lookup(Cmds, {cmd, Ref}) of
	[{{cmd, Ref}, Reply_to}] ->
	    gen_fsm:reply(Reply_to, {error, timed_out}),
	    ets:delete(Cmds, {cmd, Ref}),
	    {next_state, connected, StateData};
	[] ->
	    {next_state, connected, StateData}
    end;

%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
    Cmds = StateData#state.cmds,
    case catch binary_to_term(Data) of
	{reply, Ref, Reply} ->
	    case ets:lookup(Cmds, {cmd, Ref}) of
		[{{cmd, Ref}, Reply_to}] ->
		    gen_fsm:reply(Reply_to, Reply),
		    ets:delete(Cmds, {cmd, Ref}),
		    cancel_timer(Ref),
		    {next_state, connected, StateData};
		[] ->
		    {next_state, connected, StateData}
	    end;
	{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
	    cancel_timer(Ref),
	    {next_state, connected, StateData#state{heart_fails = 0}};
	_ ->
	    {next_state, connected, StateData}
    end;

%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
    {next_state, StateName, StateData};

handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
    cancel_timer(StateData#state.heart_ref),
    cancel_timer(StateData#state.heart_timeout_ref),
    reply_to_all(StateData#state.cmds),
    {next_state, connecting, StateData#state{socket = 0},0};

handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
    io:format("Tcp_Error: ~p~n", [Reason]),
    {next_state, StateName, StateData};

handle_info(Info, StateName, StateData) ->
    io:format("Unexpected Info: ~p~n", [Info]),
    {next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
    ok.

%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
    {ok, StateName, StateData}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

cancel_timer(none) ->
    ok;
cancel_timer(Ref) ->
    erlang:cancel_timer(Ref),
    receive
	{timeout, Ref, _} ->
	    ok
    after 0 ->
	    ok
    end.

reply_to_all(Ets) ->
    ok.

=====================================================================

%%%----------------------------------------------------------------------
%%% File    : rpc_listen.erl
%%% Author  : shinde local account <>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn(1).
-author('').

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).

-behaviour(gen_server).

-record(state,{listen_socket = []}).		% Listener socket reference
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port) ->
    Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
    gen_server:start_link({local, Name}, ?MODULE, Port, []).


%% Access to this server from socket servers
create(ServerPid, Pid) ->
    gen_server:cast(ServerPid, {create, Pid}).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init(Port) ->
    process_flag(trap_exit, true),
    case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
			      {reuseaddr,true}]) of
	{ok, ListenSocket} ->
	    {ok, Pid} = rpc_socket:start(self(), ListenSocket),	% Start
acceptor process
	    rpc_socket:get_connection(Pid),	% Tell it to start accepting
	    {ok, #state{listen_socket = ListenSocket}};
	{error, Reason} ->
	    {stop, Reason}
    end.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
    {reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
    {ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket),
    rpc_socket:get_connection(NewPid),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
% 
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
    create(self(), self()),			% Start off new acceptor as
listen socket is still open
    {noreply,State};

% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
    {noreply,State};

handle_info(Info, State) ->
    io:format("Unexpected info: ~p~n",[Info]),
    {noreply,State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
    gen_tcp:close(State#state.listen_socket),
    ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------


==============================================================


%%%----------------------------------------------------------------------
%%% File    : rpc_socket.erl
%%% Author  : ottuser local account <>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn(1).
-author('').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/2, get_connection/1, worker/5]).

-behaviour(gen_server).


% Internal state for this socket process
-record(state,{listen_pid,		% Pid of Listener
	       lis_socket,		% Listener Socket
	       socket = undefined}).	% Socket ref

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket) ->
    gen_server:start_link(rpc_socket, {ListenPid, ListenSocket},[]).

get_connection(Pid) ->
    gen_server:cast(Pid, get_conn).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket}) ->
    {ok, #state{listen_pid = ListenPid,
		lis_socket = ListenSocket}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
    {reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
    case catch gen_tcp:accept(State#state.lis_socket) of
	{error, closed} ->
	    {stop, {error, accept_failed}, State};
	{error, Reason} ->
	    {stop, {error, accept_failed}, State};
	{'EXIT', Reason} ->
	    {stop, {error, accept_failed}, State};
	{ok, Socket} ->
	    rpc_listen:create(State#state.listen_pid, self()),
	    {noreply, State#state{socket = Socket}}
    end;

handle_cast(_Reply ,State) ->
    {noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
    case catch binary_to_term(Packet) of
	{heartbeat, Ref} ->
%	    io:format("Heartbeat Received~n"),
	    gen_tcp:send(Socket, term_to_binary({heart_reply, Ref})),
	    {noreply, State};
	{apply, M, F, A, Ref} ->
	    Pid = spawn_link(?MODULE, worker, [M, F, A, Ref, Socket]),
	    {noreply, State};
	Else ->
	    io:format("Socket Received Else: ~p~n",[Else]),
	    {noreply, State}
    end;

handle_info({tcp_closed, Socket}, State) ->
    {stop, rpc_skt_closed, State};

handle_info({tcp_error, Socket, Reason}, State) ->
    gen_tcp:close(State#state.socket),
    {stop, rpc_skt_error, State};


handle_info(Anymessage, State) ->
    {noreply, State}.


%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
    ok;
terminate(Reason, #state{socket = Socket}) ->
    gen_tcp:close(Socket),
    ok.	

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

worker(M, F, A, Ref, Socket) ->
    Reply = (catch apply(M, F, A)),
%    io:format("Reply: ~p~n", [Reply]),
    gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).

	    
	

==================================================END

-------------- next part --------------
A non-text attachment was scrubbed...
Name: rpc_client.erl
Type: application/octet-stream
Size: 13185 bytes
Desc: not available
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20000614/fb38a691/attachment.obj>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: rpc_listen.erl
Type: application/octet-stream
Size: 4526 bytes
Desc: not available
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20000614/fb38a691/attachment-0001.obj>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: rpc_socket.erl
Type: application/octet-stream
Size: 5362 bytes
Desc: not available
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20000614/fb38a691/attachment-0002.obj>


More information about the erlang-questions mailing list