gen_server, gen_tcp question
Vladimir Sekissov
vladimir.sekissov@REDACTED
Fri Jul 19 01:58:00 CEST 2002
Good day,
In the attachments is the library where I tried to gather practices I
learned from design of pico, inet, wiki and other packages. Short
description:
tac_listen.erl - base server module;
tac_connect.erl - base client module;
tac_socket_mdr.erl - common mediator between transport, packet and
protocol layers;
tac_tcp_socket.erl - TCP transport
tac_udp_socket.erl - UDP transport
For specific application you need to code packet module and protocol
module. Interface is explained in comments in tac_socket_mdr.erl
Examples from real application.
Server:
...
start_listener(ConfigDB) ->
Port = tac_util:lookup(ConfigDB, port),
SKey = tac_util:lookup(ConfigDB, skey),
MaxConn = tac_util:lookup(ConfigDB, max_connections),
TrMod = tac_tcp_socket,
PctFun = tac_packet:make_packer(SKey),
PrtFun = tacacs_fsm:make_fsm(server, self(), []),
Args = [{transp_opts, [{packet, 0}, {reuseaddr,true}]}, {max, MaxConn}],
tac_listen:start_link(Port, TrMod, PctFun, PrtFun, Args).
Client:
...
SockPid = spawn_link(tac_connect, start,
[Host, Port, TrMod, PctFun, PrtFun, TrArgs]),
SockPid ! {data, Self, Data},
Ret =
receive
{return, _, Result} ->
Result;
{'EXIT', SockPid, Reson} ->
{error, Reson}
end,
May be this code help you. I would be glad to get your advises also.
Best Regards,
Vladimir Sekissov
cyberlync> > Yes. Once a connection has been accepted, you pass
cyberlync> > control of that
cyberlync> > socket to a server. (And as Kent said, make sure you
cyberlync> > handle possible
cyberlync> > incoming packets which might be sent to this process
cyberlync> > before control has
cyberlync> > been passed).
cyberlync> Yes this is a given, my question would be should the
cyberlync> spawn functionality be in the process that accepts the
cyberlync> connection or somewhere else? Basically how simple
cyberlync> should the accept process be kept?
cyberlync>
cyberlync> > More than spawning another process, you would could
cyberlync> > instead spawn a new
cyberlync> > OTP behavior. It all depends on the nature of your
cyberlync> > application.
cyberlync> ..[snip]..
cyberlync> > If you want to allow your server to handle many
cyberlync> > connections, then the
cyberlync> > answer is yes. Otherwise, if you want one server per
cyberlync> > connection, than
cyberlync> > you are better off spawning a new behavior.
cyberlync>
cyberlync> I assume that by new behaviour, you intend to spawn a
cyberlync> supervised process that implements the gen_server
cyberlync> behaviour. This would make sense.
cyberlync>
cyberlync> Mr. Cesarini, Kent, and everyone else, thanks for all
cyberlync> your help. You have been very generous with a newbie.
cyberlync>
cyberlync> Thanks,
cyberlync> Eric
cyberlync>
cyberlync> __________________________________________________
cyberlync> Do You Yahoo!?
cyberlync> Yahoo! Autos - Get free new car price quotes
cyberlync> http://autos.yahoo.com
-------------- next part --------------
%%%----------------------------------------------------------------------
%%% File : tac_socket_mdr.erl
%%% Author : <svg@REDACTED>
%%% Purpose : Socket mediator between transport and protocol levels
%%% Created : 1 Jul 2002 by <svg@REDACTED>
%%%----------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%%
%% .* tac_socket_mdr
%%
%%*
-module(tac_socket_mdr).
-author('svg@REDACTED').
-export([server/6, client/4]).
-include("dbg.hrl").
%%% Transport module interface
%%
%% listen(Port, Max, TrOpts) -> {ok, ListenPid} | Error
%% accept(ListenPid) -> {ok, Socket} | Error
%% recv(init, Socket, Timeout) -> {ok, InitState} | Error
%% recv(next, Socket, State) -> {ok, Bin, NextState} | Error
%% recv(cancel, Socket, State) -> ok | Error
%% send(Socket, Bin) -> ok | Error
%% close(Socket) -> ok | Error
%% sockname(Socket) -> {ok, {IP, Port}} | Error
%% peername(Socket) -> {ok, {IP, Port}} | Error
%% Error {error, Error}
%%
%%%
%%% Packet function interface
%% PacketFun(Binary, AccBinary) -> Next
%% initial call to receive next packet
%% Binary currently received block
%% AccBinary rest from previous block
%% Next {more, ContFun, ContState} - get next block and continue with ContFun
%% ContFun(Binary, ContState)
%% {done, Packet, RestBin} - packet recieved
%% Packet - curried to protocol function
%% RestBin == AccBinary for next PacketFun call
%% PacketFun(pack, Packet) -> Result
%% pack packet
%% Result {packet, Data}
%% Data - curried to transport module for sending
%%% Protocol function interface
%% ProtoFun(start, {Socket, TrMod}) -> Next
%% TrMod - transport module, we can get local/remote peer from Socket here
%% ProtoFun(Event, State) -> Next
%% Event start - initial state
%% {recv, Packet} - Packet received
%% sended - packet sended
%% continue, - previous protocol function requested continuation with
%% this function
%% Next {continue, Packet, CntFun, CntState, Timeout} -
%% send Packet and receive next
%% {continue, CntPrtFun, CntState} - switch to ContPrtFun
%% {recv, CntPrtFun, CntState} - receive packet
%% {send, Packet, CntPrtFun, CntState} - send packet and continue
%% {done, Packet} - send packet and exit normally
%% done - normal exit
%% {error, Error} - abnormal exit with {badproto,Error}
%%
%% start_socket(Parent, LrPid, TrMod, PctFun, PrtFun, AclFun) -> Result
%% Result nothing | exit(Reson)
%% Reson normal
%% | {deny, {RemPeer, LocalPeer}}
%% | {badproto,Error}
%% | {bad_proto_fun, Error}
%% | {badpacket, Error}
%% | {bad_packet_fun, Error}
%% | {badsock, Error}
%% Error term
%% Parent socket manager
%% LrPid listener pid
%% TrMod transport module
%% PctFun packet fun
%% PrtFun protocol fun
%% AclFun ACL fun
server(Parent, LrPid, TrMod, PctFun, PrtFun, AclFun) ->
case catch TrMod:accept(LrPid) of
{error, closed} ->
exit(normal);
{error, Error} ->
exit(Error);
{'EXIT', Error} ->
exit(Error);
{ok, Socket} ->
ask_acl(Socket, TrMod, AclFun),
tac_listen:new_connect(Parent, self()),
socket_loop(Socket, TrMod, PctFun, PrtFun)
end.
client(Socket, TrMod, PctFun, PrtFun) ->
socket_loop(Socket, TrMod, PctFun, PrtFun).
ask_acl(Socket, TrMod, AclFun) ->
{ok, LocalPeer} = TrMod:sockname(Socket),
{ok, RemPeer} = TrMod:peername(Socket),
case AclFun(RemPeer, LocalPeer) of
permit ->
ok;
_ ->
exit({deny, {RemPeer, LocalPeer}})
end.
socket_loop(Socket, TrMod, PctFun, PrtFun) ->
socket_loop(Socket, TrMod, PctFun, PrtFun, start, {Socket, TrMod}, <<>>).
socket_loop(Socket, TrMod, PctFun, PrtFun, PrtData, PrtState, AccBin) ->
?dbg("next ~w with state ~w", [PrtData, PrtState]),
case catch PrtFun(PrtData, PrtState) of
{continue, Packet, CntPrtFun, CntState, RecvTimeout} ->
send_packet(Socket, TrMod, PctFun, Packet),
{Data, RestBin} =
recv_packet(Socket, TrMod, PctFun, AccBin, RecvTimeout),
socket_loop(Socket, TrMod, PctFun, CntPrtFun,
{recv, Data}, CntState, RestBin);
{continue, CntPrtFun, CntState} ->
socket_loop(Socket, TrMod, PctFun, CntPrtFun, continue, CntState, AccBin);
{recv, CntPrtFun, CntState, RecvTimeout} ->
{Data, RestBin} =
recv_packet(Socket, TrMod, PctFun, AccBin, RecvTimeout),
socket_loop(Socket, TrMod, PctFun, CntPrtFun,
{recv, Data}, CntState, RestBin);
{send, Packet, CntPrtFun, CntState} ->
send_packet(Socket, TrMod, PctFun, Packet),
socket_loop(Socket, TrMod, PctFun, CntPrtFun,
sended, CntState, AccBin);
{done, Packet} ->
send_packet(Socket, TrMod, PctFun, Packet),
TrMod:close(Socket);
done ->
TrMod:close(Socket);
{error, Error} ->
TrMod:close(Socket),
exit({badproto,Error});
Other ->
TrMod:close(Socket),
exit({bad_proto_fun, Other})
end.
recv_packet(Socket, TrMod, PctFun, AccBin, Timeout) ->
TrState = TrMod:recv(init, Socket, Timeout),
recv_packet_loop(Socket, TrMod, PctFun, AccBin, TrState).
recv_packet_loop(Socket, TrMod, PctFun, PctState, TrState) ->
case TrMod:recv(next, Socket, TrState) of
{ok, Bin, NxtState} ->
case catch PctFun(Bin, PctState) of
{more, CntFun, CntState} ->
recv_packet_loop(Socket, TrMod, CntFun, CntState, NxtState);
{done, Data, RestBin} when binary(RestBin) ->
TrMod:recv(cancel, Socket, TrState),
{Data, RestBin};
{error, Reson} ->
TrMod:close(Socket),
exit({badpacket, Reson});
{'EXIT', Reson} ->
TrMod:close(Socket),
?dbg("error ~w", [Reson]),
exit({packet_parse, Reson});
Other ->
TrMod:close(Socket),
exit({bad_packet_fun, Other})
end;
{error, Reson} ->
exit({badsock, Reson})
end.
send_packet(Socket, TrMod, PctFun, Packet) ->
case catch PctFun(pack, Packet) of
{packet, Bin} ->
case TrMod:send(Socket, Bin) of
ok ->
ok;
{error, Reson} ->
exit({badsock, Reson})
end;
{error, Reson} ->
TrMod:close(Socket),
exit({badpacket, Reson});
{'EXIT', Reson} ->
TrMod:close(Socket),
?dbg("error ~w", [Reson]),
exit({packet_pack, Reson});
Other ->
TrMod:close(Socket),
exit({bad_packet_fun, Other})
end.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File : tac_tcp_socket.erl
%%% Author : <svg@REDACTED>
%%% Description :
%%%
%%% Created : 14 Jun 2002 by <svg@REDACTED>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%%
%% .* tac_udp_socket
%%
%%*
-module(tac_udp_socket).
-export([listen/3, connect/3, connect/4, info/2, accept/1, close/1]).
-export([send/2, recv/3, sockname/1, peername/1]).
-include("dbg.hrl").
-define(COMMON_TIMEOUT, 5000).
-record(state,{master, % listen process
acceptor=false, % current acceptor
active=[], % active connections
socket, % socket
inq = queue:new(), % input queue
inq_size = 0, % input queue size
inq_max}). % input queue max size
-record(tac_udp_socket, {listen_pid, peer}).
listen(Port, Max, Opts) ->
DefOpts = [binary, {active, true}],
Params = DefOpts ++ Opts,
case gen_udp:open(Port, Params) of
{ok, Socket} ->
Self = self(),
LrPid = spawn_link(fun () -> start_listener(Self, Socket, Max) end),
gen_udp:controlling_process(Socket, LrPid),
LrPid ! {start, Self},
{ok, LrPid};
Error ->
Error
end.
%% Start listener on any port, register ourselves as acceptor
%% and send dummy package to listener to emulate connection
%% so we can use the same interface as client
%% connect/4 is only for compatibility with common interface
%% connection timeout is never used
connect(Addr, Port, Opts) ->
connect(Addr, Port, Opts, none).
connect(Addr, Port, Opts, _Timeout) ->
{ok, LrPid} = listen(0, 1, Opts),
{ok, IP} = inet:getaddr(Addr, inet),
LrPid ! {udp, false, IP, Port, connect},
{ok, Socket} = accept(LrPid),
receive
{recv, LrPid, connect} ->
{ok, Socket}
end.
accept(LrPid) ->
LrPid ! {accept, self()},
receive
{connected, LrPid, Socket} ->
{ok, Socket};
{error, LrPid, Reson} ->
{error, Reson}
end.
close(Pid) when pid(Pid) ->
exit(Pid, normal);
close(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}) ->
LrPid ! {close, self(), Peer},
receive
{closed, LrPid} ->
ok;
{error, LrPid, Reson} ->
{error, Reson}
end.
send(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}, Bin) ->
LrPid ! {send, self(), Peer, Bin},
receive
Ret={sended, LrPid} ->
ok;
{error, LrPid, Reson} ->
{error, Reson}
end.
recv(init, _S, Timeout) when integer(Timeout);
Timeout == infinity ->
recv(init, _S, {0, Timeout});
recv(init, _S, RS={Size, Timeout}) when integer(Size),
Size >= 0 ->
RS;
recv(next, S, State) ->
do_recv(S, State, <<>>);
recv(cancel, _S, _State) ->
ok.
info(S=#tac_udp_socket{listen_pid=LrPid}, Query) ->
info(LrPid, Query);
info(LrPid, Query) when pid(LrPid) ->
LrPid ! {info, self(), Query},
receive
{info, LrPid, Answer} ->
{ok, Answer}
after ?COMMON_TIMEOUT ->
{error, timeout}
end.
start_listener(Serv, Socket, Max) ->
receive
{start, Serv} ->
process_flag(trap_exit, true),
listener_loop(#state{socket=Socket, master=Serv, inq_max=Max})
end.
listener_loop(S=#state{master=Mr,
acceptor=Current, active=Active,
socket=Sock}) ->
Self = self(),
receive
{sockname, From} ->
From ! {sockname, Self, inet:sockname(Sock)},
listener_loop(S);
{accept, Pid} ->
set_acceptor(Pid, S);
{close, From, Peer} ->
close_socket(From, Peer, S);
{send, From, Peer, Data} ->
send_to_socket(From, Peer, Data, S);
{recv, From, Peer} ->
recv_from_socket(From, Peer, S);
{info, From, state} ->
From ! {info, Self, S},
listener_loop(S);
{'EXIT', Mr, Reson} ->
exit(Reson);
{'EXIT', Self, Reson} ->
exit(Reson);
{'EXIT', Current, Reson} ->
listener_loop(S#state{acceptor=false});
{'EXIT', Pid, Reson} ->
listener_loop(S#state{active=active_delete(Pid, Active)});
{udp, _S, IP, Port, Data} ->
dispatch_packet({IP, Port}, Data, S);
Other ->
{ok, Peer} = inet:sockname(Sock),
error_logger:format("~w:~p Unexpected message: ~p~n",
[?MODULE, Peer, Other]),
listener_loop(S)
end.
set_acceptor(Pid, S=#state{socket=Sock, acceptor=false}) ->
% inet:setopts(Sock, [{active, once}]),
link(Pid),
new_connection(S#state{acceptor=Pid});
set_acceptor(Pid, S=#state{acceptor=Current}) when pid(Current) ->
Pid ! {error, self(), already_exist},
listener_loop(S).
dispatch_packet(Peer, Data, S=#state{active=Active}) ->
case active_lookup(Peer, Active) of
undefined ->
new_connection(Peer, Data, S);
{Pid, _} ->
Pid ! {recv, self(), Data},
listener_loop(S)
end.
new_connection(S=#state{acceptor=Pid, inq=Q, inq_size=QSz}) when pid(Pid) ->
case queue:out(Q) of
{empty, NQ} -> %input queue is empty
listener_loop(S);
{{value, {Peer, Data}}, NQ} ->
new_connection(Peer, Data, S#state{inq=NQ, inq_size=QSz - 1})
end.
new_connection(Peer, Data,
S=#state{acceptor=false, inq=Q, inq_size=QSz, inq_max=Max}) ->
if QSz >= Max ->
listener_loop(S); % no active acceptors, queue is full - discard
true -> % put in queue
listener_loop(S#state{inq=queue:in({Peer, Data}, Q), inq_size=QSz+1})
end;
new_connection(Peer, Data,
S=#state{master=Mr, acceptor=Current, active=Active}) ->
NewSocket = #tac_udp_socket{listen_pid=self(), peer=Peer},
Current ! {connected, self(), NewSocket},
Current ! {recv, self(), Data},
listener_loop(S#state{acceptor=false,
active=active_add({Current, Peer}, Active)}).
send_to_socket(Pid, Peer={IP, Port}, Data,
S=#state{socket=Sock}) ->
case gen_udp:send(Sock, IP, Port, Data) of
ok ->
Pid ! {sended, self()};
{error, Reson} ->
Pid ! {error, self(), Reson}
end,
listener_loop(S).
recv_from_socket(From, Peer,
S=#state{socket=Sock}) ->
% inet:setopts(Sock, [{active, once}]),
listener_loop(S).
close_socket(Pid, Peer, S=#state{active=Active}) ->
NewActive = active_delete({Pid, Peer}, Active),
if length(NewActive) == length(Active) ->
Pid ! {error, self(), not_exist};
true ->
Pid ! {closed, self()}
end,
listener_loop(S#state{active=NewActive}).
sockname(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer}) ->
LrPid ! {sockname, self()},
receive
{sockname, LrPid, Ret} ->
Ret;
{error, LrPid, Reson} ->
{error, Reson}
end.
peername(S=#tac_udp_socket{peer=Peer}) ->
{ok, Peer}.
do_recv(S=#tac_udp_socket{listen_pid=LrPid, peer=Peer},
State = {Size, Timeout}, AccBin) ->
receive
{recv, LrPid, RecvBin} ->
Bin = erlang:concat_binary([AccBin, RecvBin]),
BinSize = size(Bin),
if (Size == 0) or (BinSize == Size) ->
{ok, Bin, State};
BinSize < Size ->
do_recv(S, State, Bin);
true ->
{RetBin, _} = erlang:split_binary(Bin, Size),
{ok, RetBin, State}
end;
{error, LrPid, Reson} ->
{error, Reson}
after Timeout ->
{error, timeout}
end.
active_add(New, Active) ->
[New|Active].
active_delete(Pid, Active) when pid(Pid) ->
[Conn || Conn <- Active, element(1, Conn) =/= Pid];
active_delete(Conn, Active) ->
lists:delete(Conn, Active).
active_lookup(Pid, Active) when pid(Pid) ->
case lists:keysearch(Pid, 1, Active) of
{value, V} ->
V;
_ ->
undefined
end;
active_lookup(Peer={_IP, _Port}, Active) ->
case lists:keysearch(Peer, 2, Active) of
{value, V} ->
V;
_ ->
undefined
end.
%%start_timer(infinity, _) ->
%% infinity;
%%start_timer(Timeout, Msg) ->
%% erlang:start_timer(Timeout, self(), Msg).
%%
%%cancel_timer(infinity) ->
%% ok;
%%cancel_timer(Ref) ->
%% erlang:cancel_timer(Ref),
%% receive
%% {timeout, Ref, _} ->
%% ok
%% after 0 ->
%% ok
%% end.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File : tac_tcp_socket.erl
%%% Author : <svg@REDACTED>
%%% Description :
%%%
%%% Created : 14 Jun 2002 by <svg@REDACTED>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%%
%% .* tac_tcp_socket
%%
%%*
-module(tac_tcp_socket).
-export([listen/3, connect/3, connect/4, accept/1, close/1]).
-export([send/2, recv/3, sockname/1, peername/1]).
-include("dbg.hrl").
-record(state, {socket, master}).
listen(Port, Max, Opts) ->
DefOpts = [binary, {active, false}],
BackLog = if is_integer(Max) ->
[{backlog, Max}];
true ->
[]
end,
Params = BackLog ++ DefOpts ++ Opts,
case gen_tcp:listen(Port, Params) of
{ok, Socket} ->
Self = self(),
LrPid = spawn_link(fun () -> start_listener(Self, Socket) end),
gen_tcp:controlling_process(Socket, LrPid),
LrPid ! {start, Self},
{ok, LrPid};
Error ->
Error
end.
connect(Addr, Port, Opts) ->
connect(Addr, Port, Opts, infinity).
connect(Addr, Port, Opts, Timeout) ->
{ok, IP} = inet:getaddr(Addr, inet),
gen_tcp:connect(IP, Port, [binary, {packet, 0}, {active, false}|Opts], Timeout).
accept(LrPid) when pid(LrPid) ->
gen_tcp:accept(get_socket(LrPid)).
close(Pid) when pid(Pid) ->
exit(Pid, normal);
close(S) ->
gen_tcp:close(S).
send(S, Bin) ->
case gen_tcp:send(S, Bin) of
ok ->
ok;
Error ->
close(S),
Error
end.
sockname(Pid) when pid(Pid)->
sockname(get_socket(Pid));
sockname(S) ->
inet:sockname(S).
peername(S) ->
inet:peername(S).
get_socket(LrPid) ->
LrPid ! {get_socket, self()},
receive
{socket, LrPid, Socket} ->
Socket
end.
recv(init, _S, Timeout) when integer(Timeout);
Timeout == infinity ->
recv(init, _S, {0, Timeout});
recv(init, _S, RS={Size, Timeout}) when integer(Size);
Size >= 0 ->
RS;
recv(next, S, {Size, Timeout}) ->
do_recv(S, Size, Timeout);
recv(cancel, _S, _RS) ->
ok.
%% Internal funs
do_recv(S, Size, Timeout) ->
NxtState = {Size, Timeout},
case gen_tcp:recv(S, Size, Timeout) of
{ok, Data} ->
{ok, Data, NxtState};
Error={error, timeout} ->
Error;
Error={error, Reson} ->
gen_tcp:close(S),
Error;
Other ->
gen_tcp:close(S),
{error, Other}
end.
start_listener(Serv, Socket) ->
receive
{start, Serv} ->
process_flag(trap_exit, true),
listener_loop(#state{socket=Socket, master=Serv})
end.
listener_loop(S=#state{master=Mr, socket=Sock}) ->
Self = self(),
receive
{get_socket, Pid} ->
Pid ! {socket, Self, Sock},
listener_loop(S);
{'EXIT', Self, Reson} ->
close(Sock),
exit(Reson);
{'EXIT', Mr, Reson} ->
close(Sock),
exit(Reson);
Other ->
{ok, Peer} = inet:sockname(Sock),
error_logger:format("~w:~p Unexpected message: ~p~n",
[?MODULE, Peer, Other]),
listener_loop(S)
end.
%%recv(init, S, Timeout) when integer(Timeout);
%% Timeout == none ->
%% Ref = start_timer(Timeout, recv_timeout),
%% Ref;
%%recv(next, S, Ref) ->
%% do_recv(S, Ref);
%%recv(cancel, S, Ref) ->
%% cancel_timer(Ref).
%%
%%do_recv(S, Ref) ->
%% inet:setopts(S, [{active, once}]),
%% receive
%% {tcp, _P, Bin} ->
%% {ok, Bin, Ref};
%% {timeout, Ref, recv_timeout} ->
%% close(S),
%% {error, timeout};
%% {tcp_closed, P} ->
%% close(P),
%% cancel_timer(Ref),
%% {error, closed};
%% {tcp_error, P, Reson} ->
%% close(P),
%% cancel_timer(Ref),
%% {error, Reson}
%% end.
%%
%%start_timer(none, _) ->
%% none;
%%start_timer(Timeout, Msg) ->
%% erlang:start_timer(Timeout, self(), Msg).
%%
%%cancel_timer(none) ->
%% ok;
%%cancel_timer(Ref) ->
%% erlang:cancel_timer(Ref),
%% receive
%% {timeout, Ref, _} ->
%% ok
%% after 0 ->
%% ok
%% end.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File : tac_connect.erl
%%% Author : <svg@REDACTED>
%%% Description : TCP/IP/UDP client
%%%
%%% Created : 12 Jun 2002 by <svg@REDACTED>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%%
%% .* tac_connect
%%
%%*
-module(tac_connect).
-export([start/6]).
-include("dbg.hrl").
start(Addr, Port, TrMod, PctFun, PrtFun, Args) ->
[Timeout, TrOpts] = [getopt:value(Key, Opts)
|| Key <- [timeout, transp_opts],
Opts <- [check_opts(Args)]],
case TrMod:connect(Addr, Port, TrOpts, Timeout) of
{ok, Socket} ->
tac_socket_mdr:client(Socket, TrMod, PctFun, PrtFun);
{error, Error} ->
exit(Error);
{'EXIT', Error} ->
exit(Error)
end.
check_opts(Args) ->
ChkTimeout = fun (N) when integer(N), N > 0 ->
{ok, N};
(N=infinity) ->
{ok, N}
end,
Opts = getopt:options(Args,
[
{timeout, infinity, ChkTimeout},
{transp_opts, [], list}
], defined_strict),
Opts.
-------------- next part --------------
%%%-------------------------------------------------------------------
%%% File : tac_listen.erl
%%% Author : <svg@REDACTED>
%%% Description : TCP/IP/UDP server, accepts connections and spawns protocol
%%% and transport modules
%%%
%%% Created : 12 Jun 2002 by <svg@REDACTED>
%%%-------------------------------------------------------------------
%%
%% $Id$
%%
%% $Log$
%%
%%**
%%
%% .* tac_listen
%%
%%*
-module(tac_listen).
-behaviour(gen_server).
%%--------------------------------------------------------------------
%% Include files
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% External exports
-export([start_link/5, start_link/6, stop/1, info/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% Internal exports
-export([new_connect/2]).
-include("dbg.hrl").
-record(state, {listen_pid, % Listener pid
acceptor, %Current acceptor
active=[], %Active connections
max_conn=infinity, %Max connections number
transp_mod, %Transport module
packet_fun, %Packet fun
proto_fun, %Protocol fun
acl_fun=fun allow_all/2 % ACL function to permit/deny connection
}).
-ifndef(DEBUG).
-define(DEBUG, 1).
-endif.
-ifdef(DEBUG).
-define(DEBUG_TAC_LISTEN, [debug]).
-else.
-define(DEBUG_TAC_LISTEN, []).
-endif. %DEBUG
-ifndef(TAC_LISTEN_TIMEOUT).
-define(TAC_LISTEN_TIMEOUT, 30000).
-endif.
-define(MAX_CONNECTIONS, 256).
-define(SERVER(P),
list_to_atom(lists:flatten(io_lib:format(?MODULE_STRING "_~w",[P])))).
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Port, TrMod, PctFun, PrtFun, Args) ->
start_link(?SERVER(Port), Port, TrMod, PctFun, PrtFun, Args).
start_link(Name, Port, TrMod, PctFun, PrtFun, Args) ->
gen_server:start_link({local, Name},
?MODULE, {Port, TrMod, PctFun, PrtFun, Args},
?DEBUG_TAC_LISTEN).
stop(Port) when integer(Port) ->
stop(?SERVER(Port));
stop(Name) ->
destroy(whereis(Name)).
info(Port, Query) when integer(Port) ->
info(?SERVER(Port), Query);
info(Server, Query) ->
gen_server:call(Server, {info, Query}).
%%====================================================================
%% Server functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%--------------------------------------------------------------------
init({Port, TrMod, PctFun, PrtFun, Args}) ->
[Max, AclFun, TrOpts] = [getopt:value(Key, Opts)
|| Key <- [max, acl_fun, transp_opts],
Opts <- [check_opts(Args)]],
process_flag(trap_exit, true),
case TrMod:listen(Port, Max, TrOpts) of
{ok, LrPid} ->
Pid = start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun),
?dbg("started at port ~w",[Port]),
{ok, #state{listen_pid = LrPid,
acceptor = Pid,
transp_mod = TrMod,
proto_fun = PrtFun,
packet_fun = PctFun,
acl_fun = AclFun,
max_conn = Max
}};
{error, Reason} ->
{stop, Reason};
Other ->
{stop, Other}
end.
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call({info, state}, From,
S=#state{listen_pid=LrPid,
acceptor=Acceptor,
active=Active,
max_conn=Max,
transp_mod=TrMod
}) ->
Reply = [{listen_pid, LrPid},
{acceptor, Acceptor},
{transp_mod, TrMod},
{active, length(Active)},
{max_conn, Max}
],
{reply, Reply, S};
handle_call({info, active_pids}, From, S=#state{active=A}) ->
{reply, A, S};
handle_call(stop, From, State) ->
{stop, normal, ok, State};
handle_call(Request, From, State) ->
Reply = {bad_request, Request},
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast({connected, Current},
S=#state{acceptor=Current, active=As, max_conn=Max, transp_mod=TrMod,
listen_pid=LrPid, packet_fun=PcF, proto_fun=PrF, acl_fun=AcF}) ->
AsLen = length(As),
?dbg("new connection ~p, connections ~w", [Current, AsLen]),
New = another_accept(LrPid, TrMod, PcF, PrF, AcF, AsLen, Max, false),
{noreply, S#state{acceptor=New, active=[Current|As]}};
handle_cast(_, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT', LrPid, Reson}, S=#state{listen_pid=LrPid}) ->
?dbg("listener ~p exited, reson ~p", [LrPid, Reson]),
{stop, Reson, S#state{listen_pid=none}};
handle_info({'EXIT', Current, Reson},
S=#state{listen_pid=LrPid, acceptor=Current,
active=As, max_conn=Max,
transp_mod=TM, proto_fun=PrF, packet_fun=PcF,
acl_fun=AcF}) ->
AsLen = length(As),
?dbg("acceptor ~p exited, reson ~p, connections ~w", [Current, Reson,AsLen]),
New = another_accept(LrPid, TM, PcF, PrF, AcF, AsLen, Max, false),
{noreply, S#state{acceptor=New}};
handle_info({'EXIT', Pid, Reson},
S=#state{listen_pid=LrPid, acceptor=A, active=As, max_conn=Max,
transp_mod=TM, proto_fun=PrF, packet_fun=PcF,
acl_fun=AcF}) ->
AsLen = length(As),
As1 = lists:delete(Pid, As),
AsLen1 = length(As1),
if AsLen == AsLen1 ->
{noreply, S};
true ->
?dbg("session ~p exited, reson ~p, connections ~w",
[Pid, Reson, AsLen1]),
New = another_accept(LrPid, TM, PcF, PrF, AcF, AsLen1, Max, A),
{noreply, S#state{acceptor=New, active=As1}}
end;
handle_info(Info, S=#state{listen_pid=LrPid, transp_mod=TM}) ->
Port = case TM:sockname(LrPid) of
{ok, {_, P}} ->
P;
_Error ->
unknown
end,
error_logger:format("~w Unexpected info: ~p~n",
[process_info(self(),registered_name) , Info]),
{noreply, S};
handle_info(Info, S) ->
{noreply, S}.
%%--------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(Reason, S=#state{listen_pid=LrPid, transp_mod=TrMod,
acceptor=Current, active=Active}) ->
case LrPid of
_ when pid(LrPid) ->
TrMod:close(LrPid);
_ ->
ok
end,
Children =
case Current of
_ when pid(Current) ->
[Current|Active];
_ ->
Active
end,
lists:foreach(fun (P) -> exit(P, stop) end, Children),
?dbg("Terminating", []),
ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%--------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
destroy(Server) when pid(Server) ->
gen_server:call(Server, stop, ?TAC_LISTEN_TIMEOUT);
destroy(Other) ->
{error, Other}.
start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun) ->
Self = self(),
spawn_link(tac_socket_mdr, server,
[Self, LrPid, TrMod, PctFun, PrtFun, AclFun]).
%% return new acceptor
another_accept(LrPid, TrMod, PctFun, PrtFun, AclFun, Active, Max, false) ->
if Active < Max -> % lower than limit
start_accept(LrPid, TrMod, PctFun, PrtFun, AclFun);
true ->
?dbg("blocking accept, active ~w, max ~w", [Active, Max]),
false
end;
another_accept(_LrPid, _TM, _PcF, _PrF, _AF, _As, _Max, Acceptor) ->
Acceptor.
new_connect(Server, New) ->
gen_server:cast(Server, {connected, New}).
check_opts(Args) ->
ChkMax = fun (N) when integer(N), N > 0 ->
{ok, N};
(N=infinity) ->
{ok, N}
end,
Opts = getopt:options(Args,
[
{acl_fun, fun allow_all/2, function},
{max, ?MAX_CONNECTIONS, ChkMax},
{transp_opts, [], list}
], defined_strict),
Opts.
allow_all(RemotePeer, LocalPeer) ->
permit.
More information about the erlang-questions
mailing list