%%%---------------------------------------------------------------------- %%% File : rpc_client.erl %%% Author : shinde@one2one.co.uk local account %%% Purpose : %%% Created : 11 Apr 2000 by shinde local account %%%---------------------------------------------------------------------- -module(rpc_client). -author('shinde@one2one.co.uk'). -vsn(1). -behaviour(gen_fsm). %% External exports -export([start_link/3]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). -export([connecting/2]). -export([connected/3, connecting/3, wait_first_heart/3]). -export([call/5]). -define(HEART_TIMEOUT, 8000). % Heartbeat response timeout -define(HEART_PERIOD, 15000). % Inter heartbeat timeout -define(MAX_CMD_TIMEOUT, 5000). % Command response timeout -define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat failures before closing socket -define(RETRY_PERIOD, 5000). % Period to wait before retrying the connection -record(state, {heart_fails = 0, % counter of heartbeat failures heart_ref = none, % Timer Ref of repeating heartbeat timer heart_timeout_ref = none, % Timer Ref for heartbeat send timeout cmds = 0, % ets table containing currently outstanding commands name = "", % Given name for this connection socket = 0, % connected socket hostname = "", % hostname to maintain client connection to port = 0}). % listening port on above host %% cmds contains {{cmd, Timer_ref}, Reply_to} %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- start_link(Name, Hostname, Port) -> Reg_name = list_to_atom("rpc_client_" ++ Name), gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port}, []). call(Name, M, F, A, Timeout) -> Reg_name = list_to_atom("rpc_client_" ++ Name), gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout}, ?MAX_CMD_TIMEOUT). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %%---------------------------------------------------------------------- init({Name, Hostname, Port}) -> Reg_name = list_to_atom("rpc_client_" ++ Name), global:re_register_name(Reg_name, self()), {ok, connecting, #state{cmds = ets:new(cmds, []), name = Name, hostname = Hostname, port = Port}, 0}. % send timeout immediately %%---------------------------------------------------------------------- %% Func: StateName/2 %% Called when gen_fsm:send_event/2,3 is invoked (async) %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- %% This is called immediately on startup as a timeout from init/1 connecting(timeout, StateData) -> case gen_tcp:connect(StateData#state.hostname, StateData#state.port, [binary, {active, true}, {packet, 2}], 5000) of {ok, Socket} -> Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout), case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref})) of ok -> cancel_timer(StateData#state.heart_timeout_ref), {next_state, wait_first_heart, StateData#state{socket = Socket, heart_timeout_ref = Ref, heart_fails = 0}}; {error, _} -> cancel_timer(Ref), gen_tcp:close(Socket), timer:sleep(5000), % Wait for a while before retrying {next_state, connecting, StateData, 0} end; {error, _} -> timer:sleep(5000), {next_state, connecting, StateData, 0} end; connecting(_, StateData) -> {next_state, connecting, StateData}. %%---------------------------------------------------------------------- %% Func: StateName/3 %% Called when gen_fsm:sync_send_event/2,3 is invoked. %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- wait_first_heart({apply, _, _, _, _}, From, StateData) -> {reply, {error, not_yet_heartbeating}, connecting, StateData}. connecting({apply, _, _, _, _}, From, StateData) -> {reply, {error, not_connected}, connecting, StateData}. connected({apply, M, F, A, Timeout}, From, StateData) -> Ref = erlang:start_timer(Timeout, self(), cmd_timeout), case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F, A, Ref})) of ok -> ets:insert(StateData#state.cmds, {{cmd, Ref}, From}), {next_state, connected, StateData}; {error, Reason} -> cancel_timer(Ref), {reply, {error, Reason}, connected, StateData} end. %%---------------------------------------------------------------------- %% Func: handle_event/3 %% Called when gen_fsm:send_all_state_event/2 is invoked. %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {next_state, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- handle_sync_event(Event, From, StateName, StateData) -> Reply = ok, {reply, Reply, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% State: wait_first_heart %%---------------------------------------------------------------------- %% Waiting for first heartbeat. Received timeout which is the current attempt handle_info({timeout, Ref, heart_timeout}, wait_first_heart, #state{heart_timeout_ref = Ref} = StateData) -> gen_tcp:close(StateData#state.socket), {next_state, connecting, StateData#state{socket = 0, heart_timeout_ref = none}, 0}; %% Waiting for first heartbeat. Received timeout which could just %% be a none cancelled timer from a previous attempt. %% Continue to wait for the real one. handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) -> {next_state, connecting, StateData, 0}; %% Wow, received some data handle_info({tcp, Socket, Data}, wait_first_heart, #state{socket = Socket} = StateData) -> case catch binary_to_term(Data) of {heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref -> Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart), % io:format("First Heart_timer_set: ~p~n", [Next_heart]), {next_state, connected, StateData#state{heart_ref = Next_heart}}; _ -> gen_tcp:close(Socket), cancel_timer(StateData#state.heart_timeout_ref), {next_state, connecting, StateData, 0} end; %%---------------------------------------------------------------------- %% State: connected %%---------------------------------------------------------------------- %% Received Nth heartbeat timeout in connected phase which means we %% should close and start again. handle_info({timeout, Ref, heart_timeout}, connected, StateData) when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD-> gen_tcp:close(StateData#state.socket), cancel_timer(StateData#state.heart_ref), {next_state, connecting, StateData#state{socket = 0, heart_timeout_ref = none}, 0}; %% Received non final heartbeat timeout in connected phase, increment counter. handle_info({timeout, Ref, heart_timeout}, connected, StateData) -> Heart_fails = StateData#state.heart_fails, {next_state, connected, StateData#state{heart_fails = Heart_fails + 1}}; %% Time to send a new heartbeat. handle_info({timeout, Ref, send_heart}, connected, StateData) -> cancel_timer(StateData#state.heart_timeout_ref), % Just in case New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout), Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart), % io:format("Heart_timer_set: ~p~n", [Next_heart]), case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat, New_ref})) of ok -> {next_state, connected, StateData#state{heart_timeout_ref = New_ref, heart_ref = Next_heart}}; {error, _} -> cancel_timer(New_ref), Heart_fails = StateData#state.heart_fails, {next_state, connected, StateData#state{heart_fails = Heart_fails + 1, heart_timeout_ref = null, heart_ref = Next_heart}} end; %% Received timeout for a sent command. handle_info({timeout, Ref, cmd_timeout}, connected, StateData) -> Cmds = StateData#state.cmds, case ets:lookup(Cmds, {cmd, Ref}) of [{{cmd, Ref}, Reply_to}] -> gen_fsm:reply(Reply_to, {error, timed_out}), ets:delete(Cmds, {cmd, Ref}), {next_state, connected, StateData}; [] -> {next_state, connected, StateData} end; %% The real stuff - received a reply or heartbeat while connected handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} = StateData) -> Cmds = StateData#state.cmds, case catch binary_to_term(Data) of {reply, Ref, Reply} -> case ets:lookup(Cmds, {cmd, Ref}) of [{{cmd, Ref}, Reply_to}] -> gen_fsm:reply(Reply_to, Reply), ets:delete(Cmds, {cmd, Ref}), cancel_timer(Ref), {next_state, connected, StateData}; [] -> {next_state, connected, StateData} end; {heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref -> cancel_timer(Ref), {next_state, connected, StateData#state{heart_fails = 0}}; _ -> {next_state, connected, StateData} end; %%---------------------------------------------------------------------- %% State: all states %%---------------------------------------------------------------------- handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received in other states {next_state, StateName, StateData}; handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} = StateData) -> cancel_timer(StateData#state.heart_ref), cancel_timer(StateData#state.heart_timeout_ref), reply_to_all(StateData#state.cmds), {next_state, connecting, StateData#state{socket = 0},0}; handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket} = StateData) -> io:format("Tcp_Error: ~p~n", [Reason]), {next_state, StateName, StateData}; handle_info(Info, StateName, StateData) -> io:format("Unexpected Info: ~p~n", [Info]), {next_state, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %%---------------------------------------------------------------------- terminate(Reason, StateName, StatData) -> ok. %%---------------------------------------------------------------------- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %%---------------------------------------------------------------------- code_change(OldVsn, StateName, StateData, Extra) -> {ok, StateName, StateData}. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- cancel_timer(none) -> ok; cancel_timer(Ref) -> erlang:cancel_timer(Ref), receive {timeout, Ref, _} -> ok after 0 -> ok end. reply_to_all(Ets) -> ok.