gen_server, gen_tcp question

Vladimir Sekissov <>
Fri Jul 19 01:58:00 CEST 2002


Good day,

In the attachments is the library where I tried to gather practices I
learned from design of pico, inet, wiki and other packages. Short
description:

tac_listen.erl - base server module;
tac_connect.erl - base client module;
tac_socket_mdr.erl - common mediator between transport, packet and
protocol layers;
tac_tcp_socket.erl - TCP transport
tac_udp_socket.erl - UDP transport

For specific application you need to code packet module and protocol
module. Interface is explained in comments in tac_socket_mdr.erl

Examples from real application.

Server:

...
start_listener(ConfigDB) ->
  Port = tac_util:lookup(ConfigDB, port),
  SKey = tac_util:lookup(ConfigDB, skey),
  MaxConn = tac_util:lookup(ConfigDB, max_connections),
  TrMod = tac_tcp_socket,
  PctFun = tac_packet:make_packer(SKey),
  PrtFun = tacacs_fsm:make_fsm(server, self(), []),
  Args = [{transp_opts, [{packet, 0}, {reuseaddr,true}]}, {max, MaxConn}],
  tac_listen:start_link(Port, TrMod, PctFun, PrtFun, Args).

Client:
	...
  SockPid = spawn_link(tac_connect, start,
		       [Host, Port, TrMod, PctFun, PrtFun, TrArgs]),
  SockPid ! {data, Self, Data},
  Ret = 
    receive
      {return, _, Result} ->
	Result;
      {'EXIT', SockPid, Reson} ->
	{error, Reson}
    end,

May be this code help you. I would be glad to get your advises also.

Best Regards,
Vladimir Sekissov

cyberlync> > Yes. Once a connection has been accepted, you pass
cyberlync> > control of that 
cyberlync> > socket to a server. (And as Kent said, make sure you
cyberlync> > handle possible 
cyberlync> > incoming packets which might be sent to this process
cyberlync> > before control has 
cyberlync> > been passed).
cyberlync> Yes this is a given, my question would be should the
cyberlync> spawn functionality be in the process that accepts the
cyberlync> connection or somewhere else? Basically how simple
cyberlync> should the accept process be kept?
cyberlync> 
cyberlync> > More than spawning another process, you would could
cyberlync> > instead spawn a new 
cyberlync> > OTP behavior. It all depends on the nature of your
cyberlync> > application.
cyberlync>   ..[snip]..
cyberlync> > If you want to allow your server to handle many
cyberlync> > connections, then the 
cyberlync> > answer is yes. Otherwise, if you want one server per
cyberlync> > connection, than 
cyberlync> > you are better off spawning a new behavior.
cyberlync> 
cyberlync> I assume that by new behaviour, you intend to spawn a
cyberlync> supervised process that implements the gen_server
cyberlync> behaviour. This would make sense.
cyberlync> 
cyberlync> Mr. Cesarini, Kent, and everyone else, thanks for all
cyberlync> your help. You have been very generous with a newbie.
cyberlync> 
cyberlync> Thanks,
cyberlync> Eric
cyberlync> 
cyberlync> __________________________________________________
cyberlync> Do You Yahoo!?
cyberlync> Yahoo! Autos - Get free new car price quotes
cyberlync> http://autos.yahoo.com
-------------- next part --------------
%%%----------------------------------------------------------------------
%%% File    : tac_socket_mdr.erl
%%% Author  :  <>
%%% Purpose : Socket mediator between transport and protocol levels
%%% Created :  1 Jul 2002 by  <>
%%%----------------------------------------------------------------------

%%
%% $Id$
%%
%% $Log$
%%
%%**
%% 
%% .* tac_socket_mdr
%% 
%%*
-module(tac_socket_mdr).
-author('').

-export([server/6, client/4]).

-include("dbg.hrl").


%%% Transport module interface
%%
%% listen(Port, Max, TrOpts) -> {ok, ListenPid} | Error
%% accept(ListenPid) -> {ok, Socket} | Error
%% recv(init, Socket, Timeout) -> {ok, InitState} | Error
%% recv(next, Socket, State) -> {ok, Bin, NextState} | Error
%% recv(cancel, Socket, State) -> ok | Error
%% send(Socket, Bin) -> ok | Error
%% close(Socket) -> ok | Error
%% sockname(Socket) -> {ok, {IP, Port}} | Error
%% peername(Socket) -> {ok, {IP, Port}} | Error
%% Error {error, Error}
%%
%%%

%%% Packet function interface
%% PacketFun(Binary, AccBinary) -> Next
%%      initial call to receive next packet
%% Binary currently received block
%% AccBinary rest from previous block
%% Next {more, ContFun, ContState} - get next block and continue with ContFun
%%             ContFun(Binary, ContState)
%%      {done, Packet, RestBin} - packet recieved
%%             Packet - curried to protocol function
%%             RestBin == AccBinary for next PacketFun call
%% PacketFun(pack, Packet) -> Result
%%      pack packet
%% Result {packet, Data}
%%             Data - curried to transport module for sending

%%% Protocol function interface
%% ProtoFun(start, {Socket, TrMod}) -> Next 
%% TrMod - transport module, we can get local/remote peer from Socket here
%% ProtoFun(Event, State) -> Next 
%%  Event start - initial state
%%      {recv, Packet} - Packet received
%%      sended - packet sended
%%      continue,  - previous protocol function requested continuation with
%%                 this function
%% Next {continue, Packet, CntFun, CntState, Timeout} -
%%      send Packet and receive next
%%      {continue, CntPrtFun, CntState} - switch to ContPrtFun
%%      {recv, CntPrtFun, CntState} - receive packet
%%      {send, Packet, CntPrtFun, CntState} - send packet and continue
%%      {done, Packet} - send packet and exit normally
%%      done - normal exit
%%      {error, Error} - abnormal exit with {badproto,Error}
%%

%% start_socket(Parent, LrPid, TrMod, PctFun, PrtFun, AclFun) -> Result
%%  Result nothing | exit(Reson)
%%  Reson normal
%%        | {deny, {RemPeer, LocalPeer}}
%%        | {badproto,Error}
%%        | {bad_proto_fun, Error}
%%        | {badpacket, Error}
%%        | {bad_packet_fun, Error}
%%        | {badsock, Error}
%%  Error term
%% Parent socket manager
%% LrPid listener pid
%% TrMod transport module
%% PctFun packet fun
%% PrtFun protocol fun
%% AclFun ACL fun
server(Parent, LrPid, TrMod, PctFun, PrtFun, AclFun) ->
  case catch TrMod:accept(LrPid) of
    {error, closed} ->
      exit(normal);
    {error, Error} ->
      exit(Error);
    {'EXIT', Error} ->
      exit(Error);
    {ok, Socket} ->
      ask_acl(Socket, TrMod, AclFun),
      tac_listen:new_connect(Parent, self()),
      socket_loop(Socket, TrMod, PctFun, PrtFun)
  end.

client(Socket, TrMod, PctFun, PrtFun) ->
  socket_loop(Socket, TrMod, PctFun, PrtFun).

ask_acl(Socket, TrMod, AclFun) ->
  {ok, LocalPeer} = TrMod:sockname(Socket),
  {ok, RemPeer} = TrMod:peername(Socket),
  case AclFun(RemPeer, LocalPeer) of
    permit ->
      ok;
    _ ->
      exit({deny, {RemPeer, LocalPeer}})
  end.

socket_loop(Socket, TrMod, PctFun, PrtFun) ->
  socket_loop(Socket, TrMod, PctFun, PrtFun, start, {Socket, TrMod}, <<>>).

socket_loop(Socket, TrMod, PctFun, PrtFun, PrtData, PrtState, AccBin) ->
  ?dbg("next ~w with state ~w", [PrtData, PrtState]),
  case catch PrtFun(PrtData, PrtState) of
    {continue, Packet, CntPrtFun, CntState, RecvTimeout} ->
      send_packet(Socket, TrMod, PctFun, Packet),
      {Data, RestBin} =
	recv_packet(Socket, TrMod, PctFun, AccBin, RecvTimeout),
      socket_loop(Socket, TrMod, PctFun, CntPrtFun,
		  {recv, Data}, CntState, RestBin);
    {continue, CntPrtFun, CntState} ->
      socket_loop(Socket, TrMod, PctFun, CntPrtFun, continue, CntState, AccBin);
    {recv, CntPrtFun, CntState, RecvTimeout} ->
      {Data, RestBin} =
	recv_packet(Socket, TrMod, PctFun, AccBin, RecvTimeout),
      socket_loop(Socket, TrMod, PctFun, CntPrtFun,
		  {recv, Data}, CntState, RestBin);
    {send, Packet, CntPrtFun, CntState} ->
      send_packet(Socket, TrMod, PctFun, Packet),
      socket_loop(Socket, TrMod, PctFun, CntPrtFun,
		  sended, CntState, AccBin);
    {done, Packet} ->
      send_packet(Socket, TrMod, PctFun, Packet),
      TrMod:close(Socket);
    done ->
      TrMod:close(Socket);
    {error, Error} ->
      TrMod:close(Socket),
      exit({badproto,Error});
    Other ->
      TrMod:close(Socket),
      exit({bad_proto_fun, Other})
  end.

recv_packet(Socket, TrMod, PctFun, AccBin, Timeout) ->
  TrState = TrMod:recv(init, Socket, Timeout),
  recv_packet_loop(Socket, TrMod, PctFun, AccBin, TrState).

recv_packet_loop(Socket, TrMod, PctFun, PctState, TrState) ->
  case TrMod:recv(next, Socket, TrState) of
    {ok, Bin, NxtState} ->
      case catch PctFun(Bin, PctState) of
        {more, CntFun, CntState} ->
          recv_packet_loop(Socket, TrMod, CntFun, CntState, NxtState);
        {done, Data, RestBin} when binary(RestBin) ->
          TrMod:recv(cancel, Socket, TrState),
          {Data, RestBin};
        {error, Reson} ->
          TrMod:close(Socket),
          exit({badpacket, Reson});
	{'EXIT', Reson} ->
	  TrMod:close(Socket),
	  ?dbg("error ~w", [Reson]),
	  exit({packet_parse, Reson});
        Other ->
          TrMod:close(Socket),
          exit({bad_packet_fun, Other})
      end;
    {error, Reson} ->
      exit({badsock, Reson})
  end.

send_packet(Socket, TrMod, PctFun, Packet) ->
  case catch PctFun(pack, Packet) of
    {packet, Bin} ->
      case TrMod:send(Socket, Bin) of
        ok ->
          ok;
        {error, Reson} ->
          exit({badsock, Reson})
      end;
    {error, Reson} ->
      TrMod:close(Socket),
      exit({badpacket, Reson});
    {'EXIT', Reson} ->
      TrMod:close(Socket),
      ?dbg("error ~w", [Reson]),
      exit({packet_pack, Reson});
    Other ->
      TrMod:close(Socket),
      exit({bad_packet_fun, Other})
  end.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File    : tac_tcp_socket.erl
%%% Author  :  <>
%%% Description : 
%%%
%%% Created : 14 Jun 2002 by  <>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%% 
%% .* tac_udp_socket
%% 
%%*
-module(tac_udp_socket).

-export([listen/3, connect/3, connect/4, info/2, accept/1, close/1]).
-export([send/2, recv/3, sockname/1, peername/1]).

-include("dbg.hrl").

-define(COMMON_TIMEOUT, 5000).

-record(state,{master,            % listen process
               acceptor=false,    % current acceptor
               active=[],         % active connections
               socket,            % socket
               inq = queue:new(), % input queue
               inq_size = 0,      % input queue size
               inq_max}).         % input queue max size
-record(tac_udp_socket, {listen_pid, peer}).

listen(Port, Max, Opts) ->
  DefOpts = [binary, {active, true}],
  Params = DefOpts ++ Opts,
  case  gen_udp:open(Port, Params) of
    {ok, Socket} ->
      Self = self(),
      LrPid = spawn_link(fun () -> start_listener(Self, Socket, Max) end),
      gen_udp:controlling_process(Socket, LrPid),
      LrPid ! {start, Self},
      {ok, LrPid};
    Error ->
      Error
  end.

%% Start listener on any port, register ourselves as acceptor
%% and send dummy package to listener to emulate connection
%% so we can use the same interface as client
%% connect/4 is only for compatibility with common interface
%% connection timeout is never used
connect(Addr, Port, Opts) ->
  connect(Addr, Port, Opts, none).

connect(Addr, Port, Opts, _Timeout) ->
  {ok, LrPid} = listen(0, 1, Opts),
  {ok, IP} = inet:getaddr(Addr, inet),
  LrPid ! {udp, false, IP, Port, connect},
  {ok, Socket} = accept(LrPid),
  receive
    {recv, LrPid, connect} ->
      {ok, Socket}
  end.

accept(LrPid) ->
  LrPid ! {accept, self()},
  receive
    {connected, LrPid, Socket} ->
      {ok, Socket};
    {error, LrPid, Reson} ->
      {error, Reson}
  end.

close(Pid) when pid(Pid) ->
  exit(Pid, normal);  
close(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}) ->
  LrPid ! {close, self(), Peer},
  receive
    {closed, LrPid} ->
      ok;
    {error, LrPid, Reson} ->
      {error, Reson}
  end.

send(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}, Bin) ->
  LrPid ! {send, self(), Peer, Bin},
  receive
    Ret={sended, LrPid} ->
      ok;
    {error, LrPid, Reson} ->
      {error, Reson}
  end.

recv(init, _S, Timeout) when integer(Timeout);
                            Timeout == infinity ->
  recv(init, _S, {0, Timeout});

recv(init, _S, RS={Size, Timeout}) when integer(Size),
					Size >= 0 ->
  RS;
recv(next, S, State) ->
  do_recv(S, State, <<>>);
recv(cancel, _S, _State) ->
  ok.

info(S=#tac_udp_socket{listen_pid=LrPid}, Query) ->
  info(LrPid, Query);
info(LrPid, Query) when pid(LrPid) ->
  LrPid ! {info, self(), Query},
  receive
    {info, LrPid, Answer} ->
      {ok, Answer}
  after ?COMMON_TIMEOUT ->
      {error, timeout}
  end.

start_listener(Serv, Socket, Max) ->
  receive
    {start, Serv} ->
      process_flag(trap_exit, true),
      listener_loop(#state{socket=Socket, master=Serv, inq_max=Max})
  end.

listener_loop(S=#state{master=Mr,
                       acceptor=Current, active=Active,
                       socket=Sock}) ->
  Self = self(),
  receive
    {sockname, From} ->
      From ! {sockname, Self, inet:sockname(Sock)},
      listener_loop(S);
    {accept, Pid} ->
      set_acceptor(Pid, S);
    {close, From, Peer} ->
      close_socket(From, Peer, S);
    {send, From, Peer, Data} ->
      send_to_socket(From, Peer, Data, S);
    {recv, From, Peer} ->
      recv_from_socket(From, Peer, S);
    {info, From, state} ->
      From ! {info, Self, S},
      listener_loop(S);
    {'EXIT', Mr, Reson} ->
      exit(Reson);
    {'EXIT', Self, Reson} ->
      exit(Reson);
    {'EXIT', Current, Reson} ->
      listener_loop(S#state{acceptor=false});
    {'EXIT', Pid, Reson} ->
      listener_loop(S#state{active=active_delete(Pid, Active)});
    {udp, _S, IP, Port, Data} ->
      dispatch_packet({IP, Port}, Data, S);
    Other ->
      {ok, Peer} = inet:sockname(Sock),
      error_logger:format("~w:~p Unexpected message: ~p~n",
			  [?MODULE, Peer, Other]),
      listener_loop(S)
  end.

set_acceptor(Pid, S=#state{socket=Sock, acceptor=false}) ->
%  inet:setopts(Sock, [{active, once}]),
  link(Pid),
  new_connection(S#state{acceptor=Pid});
set_acceptor(Pid, S=#state{acceptor=Current}) when pid(Current) ->
  Pid ! {error, self(), already_exist},
  listener_loop(S).

dispatch_packet(Peer, Data, S=#state{active=Active}) ->
  case active_lookup(Peer, Active) of
    undefined ->
      new_connection(Peer, Data, S);
    {Pid, _} ->
      Pid ! {recv, self(), Data},
      listener_loop(S)
  end.

new_connection(S=#state{acceptor=Pid, inq=Q, inq_size=QSz}) when pid(Pid) ->
  case queue:out(Q) of
    {empty, NQ} -> %input queue is empty
      listener_loop(S);
    {{value, {Peer, Data}}, NQ} ->
      new_connection(Peer, Data, S#state{inq=NQ, inq_size=QSz - 1})
  end.

new_connection(Peer, Data,
	       S=#state{acceptor=false, inq=Q, inq_size=QSz, inq_max=Max}) ->
  if QSz >= Max ->
      listener_loop(S); % no active acceptors, queue is full - discard
     true -> % put in queue
      listener_loop(S#state{inq=queue:in({Peer, Data}, Q), inq_size=QSz+1})
  end;
new_connection(Peer, Data,
               S=#state{master=Mr, acceptor=Current, active=Active}) ->
  NewSocket = #tac_udp_socket{listen_pid=self(), peer=Peer},
  Current ! {connected, self(), NewSocket},
  Current ! {recv, self(), Data},
  listener_loop(S#state{acceptor=false,
                        active=active_add({Current, Peer}, Active)}).

send_to_socket(Pid, Peer={IP, Port}, Data,
               S=#state{socket=Sock}) ->
  case gen_udp:send(Sock, IP, Port, Data) of
    ok ->
      Pid ! {sended, self()};
    {error, Reson} ->
      Pid ! {error, self(), Reson}
  end,
  listener_loop(S).

recv_from_socket(From, Peer,
                 S=#state{socket=Sock}) ->
%  inet:setopts(Sock, [{active, once}]),
  listener_loop(S).

close_socket(Pid, Peer, S=#state{active=Active}) ->
  NewActive = active_delete({Pid, Peer}, Active),
  if length(NewActive) == length(Active) ->
      Pid ! {error, self(), not_exist};
     true ->
      Pid ! {closed, self()}
  end,
  listener_loop(S#state{active=NewActive}).

sockname(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}) ->
  LrPid ! {sockname, self()},
  receive
    {sockname, LrPid, Ret} ->
      Ret;
    {error, LrPid, Reson} ->
      {error, Reson}
  end.

peername(S=#tac_udp_socket{peer=Peer}) ->
  {ok, Peer}.

do_recv(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer},
	State = {Size, Timeout}, AccBin) ->
  receive
    {recv, LrPid, RecvBin} ->
      Bin = erlang:concat_binary([AccBin, RecvBin]),
      BinSize = size(Bin),
      if (Size == 0) or (BinSize == Size) ->
	  {ok, Bin, State};
	 BinSize < Size ->
	  do_recv(S, State, Bin);
	 true ->
	  {RetBin, _} = erlang:split_binary(Bin, Size),
	  {ok, RetBin, State}
      end;
    {error, LrPid, Reson} ->
      {error, Reson}
  after Timeout ->
      {error, timeout}
  end.

active_add(New, Active) ->
  [New|Active].

active_delete(Pid, Active) when pid(Pid) ->
  [Conn || Conn <- Active, element(1, Conn) =/= Pid];
active_delete(Conn, Active) ->
  lists:delete(Conn, Active).

active_lookup(Pid, Active) when pid(Pid) ->
  case lists:keysearch(Pid, 1, Active) of
    {value, V} ->
      V;
    _ ->
      undefined
  end;
active_lookup(Peer={_IP, _Port}, Active) ->
  case lists:keysearch(Peer, 2, Active) of
    {value, V} ->
      V;
    _ ->
      undefined
  end.

%%start_timer(infinity, _) ->
%%  infinity;
%%start_timer(Timeout, Msg) ->
%%  erlang:start_timer(Timeout, self(), Msg).
%%
%%cancel_timer(infinity) ->
%%    ok;
%%cancel_timer(Ref) ->
%%    erlang:cancel_timer(Ref),
%%    receive
%%	{timeout, Ref, _} ->
%%	    ok
%%    after 0 ->
%%	    ok
%%    end.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File    : tac_tcp_socket.erl
%%% Author  :  <>
%%% Description : 
%%%
%%% Created : 14 Jun 2002 by  <>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%% 
%% .* tac_tcp_socket
%% 
%%*
-module(tac_tcp_socket).

-export([listen/3, connect/3, connect/4, accept/1, close/1]).
-export([send/2, recv/3, sockname/1, peername/1]).

-include("dbg.hrl").

-record(state, {socket, master}).

listen(Port, Max, Opts) ->
  DefOpts = [binary, {active, false}],
  BackLog = if is_integer(Max) ->
		[{backlog, Max}];
	       true ->
		[]
	    end,
  Params = BackLog ++ DefOpts ++ Opts,
  case  gen_tcp:listen(Port, Params) of
    {ok, Socket} ->
      Self = self(),
      LrPid = spawn_link(fun () -> start_listener(Self, Socket) end),
      gen_tcp:controlling_process(Socket, LrPid),
      LrPid ! {start, Self},
      {ok, LrPid};
    Error ->
      Error
  end.

connect(Addr, Port, Opts) ->
  connect(Addr, Port, Opts, infinity).

connect(Addr, Port, Opts, Timeout) ->
  {ok, IP} = inet:getaddr(Addr, inet),
  gen_tcp:connect(IP, Port, [binary, {packet, 0}, {active, false}|Opts], Timeout).

accept(LrPid) when pid(LrPid) ->
  gen_tcp:accept(get_socket(LrPid)).

close(Pid) when pid(Pid) ->
  exit(Pid, normal);
close(S) ->
  gen_tcp:close(S).

send(S, Bin) ->
  case gen_tcp:send(S, Bin) of
    ok ->
      ok;
    Error ->
      close(S),
      Error
  end.

sockname(Pid) when pid(Pid)->
  sockname(get_socket(Pid));
sockname(S) ->
  inet:sockname(S).

peername(S) ->
  inet:peername(S).

get_socket(LrPid) ->
  LrPid ! {get_socket, self()},
  receive
    {socket, LrPid, Socket} ->
      Socket
  end.

recv(init, _S, Timeout) when integer(Timeout);
                            Timeout == infinity ->
  recv(init, _S, {0, Timeout});

recv(init, _S, RS={Size, Timeout}) when integer(Size);
                                       Size >= 0 ->
  RS;
recv(next, S, {Size, Timeout}) ->
  do_recv(S, Size, Timeout);
recv(cancel, _S, _RS) ->
  ok.

%% Internal funs

do_recv(S, Size, Timeout) ->
  NxtState = {Size, Timeout},
  case gen_tcp:recv(S, Size, Timeout) of
    {ok, Data} ->
      {ok, Data, NxtState};
    Error={error, timeout} ->
      Error;
    Error={error, Reson} ->
      gen_tcp:close(S),
      Error;
    Other ->
      gen_tcp:close(S),
      {error, Other}
  end.

start_listener(Serv, Socket) ->
  receive
    {start, Serv} ->
      process_flag(trap_exit, true),
      listener_loop(#state{socket=Socket, master=Serv})
  end.

listener_loop(S=#state{master=Mr, socket=Sock}) ->
  Self = self(),
  receive
    {get_socket, Pid} ->
      Pid ! {socket, Self, Sock},
      listener_loop(S);
    {'EXIT', Self, Reson} ->
      close(Sock),
      exit(Reson);
    {'EXIT', Mr, Reson} ->
      close(Sock),
      exit(Reson);
    Other ->
      {ok, Peer} = inet:sockname(Sock),
      error_logger:format("~w:~p Unexpected message: ~p~n",
			  [?MODULE, Peer, Other]),
      listener_loop(S)
  end.

%%recv(init, S, Timeout) when integer(Timeout);
%%                             Timeout == none ->
%%  Ref = start_timer(Timeout, recv_timeout),
%%  Ref;
%%recv(next, S, Ref) ->
%%  do_recv(S, Ref);
%%recv(cancel, S, Ref) ->
%%  cancel_timer(Ref).
%%
%%do_recv(S, Ref) ->
%%  inet:setopts(S, [{active, once}]),
%%  receive
%%    {tcp, _P, Bin} ->
%%      {ok, Bin, Ref};
%%    {timeout, Ref, recv_timeout} ->
%%      close(S),
%%      {error, timeout};
%%    {tcp_closed, P} ->
%%      close(P),
%%      cancel_timer(Ref),
%%      {error, closed};
%%    {tcp_error, P, Reson} ->
%%      close(P),
%%      cancel_timer(Ref),
%%      {error, Reson}
%%  end.
%%
%%start_timer(none, _) ->
%%  none;
%%start_timer(Timeout, Msg) ->
%%  erlang:start_timer(Timeout, self(), Msg).
%%
%%cancel_timer(none) ->
%%    ok;
%%cancel_timer(Ref) ->
%%    erlang:cancel_timer(Ref),
%%    receive
%%	{timeout, Ref, _} ->
%%	    ok
%%    after 0 ->
%%	    ok
%%    end.

-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File    : tac_connect.erl
%%% Author  :  <>
%%% Description : TCP/IP/UDP client
%%%
%%% Created : 12 Jun 2002 by  <>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%% 
%% .* tac_connect
%% 
%%*

-module(tac_connect).

-export([start/6]).

-include("dbg.hrl").

start(Addr, Port, TrMod, PctFun, PrtFun, Args) ->
  [Timeout, TrOpts] = [getopt:value(Key, Opts)
		       || Key <- [timeout, transp_opts],
			  Opts <- [check_opts(Args)]],
  case TrMod:connect(Addr, Port, TrOpts, Timeout) of
    {ok, Socket} ->
      tac_socket_mdr:client(Socket, TrMod, PctFun, PrtFun);
    {error, Error} ->
      exit(Error);
    {'EXIT', Error} ->
      exit(Error)
  end.

check_opts(Args) ->
  ChkTimeout = fun (N) when integer(N), N > 0 ->
		   {ok, N};
		   (N=infinity) ->
		   {ok, N}
	       end,
  Opts = getopt:options(Args,
                        [
                         {timeout, infinity, ChkTimeout},
			 {transp_opts, [], list}
                        ], defined_strict),
  Opts.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File    : tac_listen.erl
%%% Author  :  <>
%%% Description : TCP/IP/UDP server, accepts connections and spawns protocol
%%%               and transport modules
%%%
%%% Created : 12 Jun 2002 by  <>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%% 
%% .* tac_listen
%% 
%%*
-module(tac_listen).

-behaviour(gen_server).
%%--------------------------------------------------------------------
%% Include files
%%--------------------------------------------------------------------

%%--------------------------------------------------------------------
%% External exports
-export([start_link/5, start_link/6, stop/1, info/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%% Internal exports
-export([new_connect/2]).

-include("dbg.hrl").

-record(state, {listen_pid,              % Listener pid
                acceptor,                %Current acceptor
                active=[],               %Active connections
                max_conn=infinity,       %Max connections number
                transp_mod,              %Transport module
                packet_fun,              %Packet fun
                proto_fun,               %Protocol fun
                acl_fun=fun allow_all/2  % ACL function to permit/deny connection
               }).

-ifndef(DEBUG).
-define(DEBUG, 1).
-endif.

-ifdef(DEBUG).
-define(DEBUG_TAC_LISTEN, [debug]).
-else.
-define(DEBUG_TAC_LISTEN, []).
-endif. %DEBUG

-ifndef(TAC_LISTEN_TIMEOUT).
-define(TAC_LISTEN_TIMEOUT, 30000).
-endif.

-define(MAX_CONNECTIONS, 256).

-define(SERVER(P),
        list_to_atom(lists:flatten(io_lib:format(?MODULE_STRING "_~w",[P])))).

%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Port, TrMod, PctFun, PrtFun, Args) ->
  start_link(?SERVER(Port), Port, TrMod, PctFun, PrtFun, Args).

start_link(Name, Port, TrMod, PctFun, PrtFun, Args) ->
  gen_server:start_link({local, Name},
                        ?MODULE, {Port, TrMod, PctFun, PrtFun, Args},
                        ?DEBUG_TAC_LISTEN).

stop(Port) when integer(Port) ->
  stop(?SERVER(Port));
stop(Name) ->
  destroy(whereis(Name)).

info(Port, Query) when integer(Port) ->
  info(?SERVER(Port), Query);
info(Server, Query) ->
  gen_server:call(Server, {info, Query}).

%%====================================================================
%% Server functions
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%--------------------------------------------------------------------
init({Port, TrMod, PctFun, PrtFun, Args}) ->
  [Max, AclFun, TrOpts] = [getopt:value(Key, Opts)
                   || Key <- [max, acl_fun, transp_opts],
		      Opts <- [check_opts(Args)]],
  process_flag(trap_exit, true),
  case TrMod:listen(Port, Max, TrOpts) of
    {ok, LrPid} ->
      Pid = start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun),
      ?dbg("started at port ~w",[Port]),
      {ok, #state{listen_pid = LrPid,
                  acceptor = Pid,
                  transp_mod = TrMod,
                  proto_fun = PrtFun,
                  packet_fun = PctFun,
                  acl_fun = AclFun,
                  max_conn = Max
                 }};
    {error, Reason} ->
      {stop, Reason};
    Other ->
      {stop, Other}
  end.

%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call({info, state}, From,
            S=#state{listen_pid=LrPid,
                     acceptor=Acceptor,
                     active=Active,
                     max_conn=Max,
                     transp_mod=TrMod
                    }) ->
  Reply = [{listen_pid, LrPid},
           {acceptor, Acceptor},
           {transp_mod, TrMod},
           {active, length(Active)},
           {max_conn, Max}
          ],
  {reply, Reply, S};
handle_call({info, active_pids}, From, S=#state{active=A}) ->
  {reply, A, S};
handle_call(stop, From, State) ->
  {stop, normal, ok, State};
handle_call(Request, From, State) ->
  Reply = {bad_request, Request},
  {reply, Reply, State}.

%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast({connected, Current},
            S=#state{acceptor=Current, active=As, max_conn=Max, transp_mod=TrMod,
                     listen_pid=LrPid, packet_fun=PcF, proto_fun=PrF, acl_fun=AcF}) ->
  AsLen = length(As),
  ?dbg("new connection ~p, connections ~w", [Current, AsLen]),
  New = another_accept(LrPid, TrMod, PcF, PrF, AcF, AsLen, Max, false),
  {noreply, S#state{acceptor=New, active=[Current|As]}};
handle_cast(_, State) ->
  {noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT', LrPid, Reson}, S=#state{listen_pid=LrPid}) ->
  ?dbg("listener ~p exited, reson ~p", [LrPid, Reson]),
  {stop, Reson, S#state{listen_pid=none}};
handle_info({'EXIT', Current, Reson},
            S=#state{listen_pid=LrPid, acceptor=Current,
                     active=As, max_conn=Max,
                     transp_mod=TM, proto_fun=PrF, packet_fun=PcF,
                     acl_fun=AcF}) ->
  AsLen = length(As),
  ?dbg("acceptor ~p exited, reson ~p, connections ~w", [Current, Reson,AsLen]),
  New = another_accept(LrPid, TM, PcF, PrF, AcF, AsLen, Max, false),
  {noreply, S#state{acceptor=New}};
handle_info({'EXIT', Pid, Reson},
            S=#state{listen_pid=LrPid, acceptor=A, active=As, max_conn=Max,
                     transp_mod=TM, proto_fun=PrF, packet_fun=PcF,
                     acl_fun=AcF})  ->
  AsLen = length(As),
  As1 = lists:delete(Pid, As),
  AsLen1 = length(As1),

  if AsLen == AsLen1 ->
      {noreply, S};
     true ->
      ?dbg("session ~p exited, reson ~p, connections ~w",
        [Pid, Reson, AsLen1]),
      New = another_accept(LrPid, TM, PcF, PrF, AcF, AsLen1, Max, A),
      {noreply, S#state{acceptor=New, active=As1}}
  end;
handle_info(Info, S=#state{listen_pid=LrPid, transp_mod=TM}) ->
  Port = case TM:sockname(LrPid) of
           {ok, {_, P}} ->
             P;
           _Error ->
             unknown
         end,
  error_logger:format("~w Unexpected info: ~p~n",
		      [process_info(self(),registered_name) , Info]),
  {noreply, S};
handle_info(Info, S) ->
  {noreply, S}.

%%--------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(Reason, S=#state{listen_pid=LrPid, transp_mod=TrMod,
			   acceptor=Current, active=Active}) ->
  case LrPid of
    _ when pid(LrPid) ->
      TrMod:close(LrPid);
    _ ->
      ok
  end,
  Children = 
    case Current of
      _ when pid(Current) ->
	[Current|Active];
      _ ->
	Active
    end,
  lists:foreach(fun (P) -> exit(P, stop) end, Children),
  ?dbg("Terminating", []),
  ok.

%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%--------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
  {ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

destroy(Server) when pid(Server) ->
  gen_server:call(Server, stop, ?TAC_LISTEN_TIMEOUT);
destroy(Other) ->
  {error, Other}.

start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun) ->
  Self = self(),
  spawn_link(tac_socket_mdr, server,
	     [Self, LrPid, TrMod, PctFun, PrtFun, AclFun]).

%% return new acceptor
another_accept(LrPid, TrMod, PctFun, PrtFun, AclFun, Active, Max, false) ->
  if Active < Max -> % lower than limit
      start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun);
     true ->
      ?dbg("blocking accept, active ~w, max ~w", [Active, Max]),
      false
  end;
another_accept(_LrPid, _TM, _PcF, _PrF, _AF, _As, _Max, Acceptor) ->
  Acceptor.

new_connect(Server, New) ->
  gen_server:cast(Server, {connected, New}).

check_opts(Args) ->
  ChkMax = fun (N) when integer(N), N > 0 ->
               {ok, N};
               (N=infinity) ->
               {ok, N}
           end,
  Opts = getopt:options(Args,
                        [
                         {acl_fun, fun allow_all/2, function},
                         {max, ?MAX_CONNECTIONS, ChkMax},
			 {transp_opts, [], list}
                        ], defined_strict),
  Opts.

allow_all(RemotePeer, LocalPeer) ->
  permit.


More information about the erlang-questions mailing list