TCP stack throughput
Joel Reymont
joelr1@REDACTED
Mon Jul 4 15:37:49 CEST 2005
The files are not available for some reason so I'm pasting them here
just in case...
-module(fac).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-export([start/1, stop/1]).
-export([fac/4]).
-record(server, {
self,
port,
players,
pending
}).
new(Port) ->
#server {
self = self(),
port = Port
}.
start(Port) ->
gen_server:start(fac, [Port], []).
init([Port]) ->
process_flag(trap_exit, true),
Server = new(Port),
tcp_server:start_raw_server(Port,
fun(Sock) -> handler(Sock, Server) end,
50000, 2048),
{ok, Server}.
stop(Server) ->
gen_server:cast(Server, stop).
terminate(normal, Server) ->
tcp_server:stop(Server#server.port),
ok.
handle_cast(stop, Server) ->
{stop, normal, Server};
handle_cast(Event, Server) ->
error_logger:info_report([{module, ?MODULE},
{line, ?LINE},
{self, self()},
{message, Event}]),
{noreply, Server}.
handle_call(Event, From, Server) ->
error_logger:info_report([{module, ?MODULE},
{line, ?LINE},
{self, self()},
{message, Event},
{from, From}]),
{noreply, Server}.
handle_info({'EXIT', _Pid, _Reason}, Server) ->
%% child exit?
{noreply, Server};
handle_info(Info, Server) ->
error_logger:info_report([{module, ?MODULE},
{line, ?LINE},
{self, self()},
{message, Info}]),
{noreply, Server}.
code_change(_OldVsn, Server, _Extra) ->
{ok, Server}.
%%%
%%% Implementation
%%%
fac(N) when N > 0 ->
N * fac(N - 1);
fac(0) ->
1.
fac(_Host, _Port, _N, 0) ->
ok;
fac(Host, Port, N, Max) ->
Fun = fun() ->
case tcp_server:start_client(Host, Port, 2048) of
{ok, Sock} ->
case gen_tcp:send(Sock, <<Max:32, N:32>>) of
ok ->
receive
{tcp, Sock, <<Fac:2048>>} ->
io:format("#~w: fac(~w) = ~w~n",
[Max, N, Fac]);
{error, closed} ->
io:format("#~w: connection closed~n",
[Max]);
Any ->
io:format("#~w: receive: ~w~n",
[Max, Any])
end,
gen_tcp:close(Sock);
Any ->
io:format("#~w: gen_tcp: ~w~n", [Max, Any])
end;
Any ->
io:format("#~w: start_client: ~w~n", [Max, Any])
end
end,
spawn(Fun),
fac(Host, Port, N, Max - 1).
handler(Socket, Parent) ->
receive
{tcp, Socket, <<Iter:32, N:32>>} ->
io:format("#~w: Calculating factorial(~w)~n", [Iter, N]),
Fac = fac(N),
gen_tcp:send(Socket, <<Fac:2048>>),
handler(Socket, Parent);
{tcp_closed, Socket} ->
%%io:format("Socket ~w closed~n", [Socket]);
ok;
Any ->
io:format("Got ~w in the handler~n", [Any]),
gen_tcp:send(Socket, <<0:2048>>)
end.
-----------------------
%% Copyright (C) 2002, Joe Armstrong
%% File : tcp_server.erl
%% Author : Joe Armstrong (joe@REDACTED)
%% Purpose : Keeps track of a number of TCP sessions
%% Last modified: 2002-11-17
-module(tcp_server).
-export([start_raw_server/4, start_client/3,
stop/1, children/1, send/2]).
-define(KILL_DELAY, 1000).
%% -export([start_child/3]).
%% start_raw_server(Port, Fun, Max)
%% This server accepts up to Max connections on Port
%% The *first* time a connection is made to Port
%% Then Fun(Socket) is called.
%% Thereafter messages to the socket result in messsages to the
handler.
%% a typical server is usually written like this:
%% To setup a lister
%% start_server(Port) ->
%% S = self(),
%% process_flag(trap_exit, true),
%% tcp_server:start_raw_server(Port,
%% fun(Socket) -> input_handler(Socket, S) end,
%% 15,
%% 0)
%% loop().
%% The loop() process is a central controller that all
%% processes can use to synchronize amongst themselfves if necessary
%% It ends up as the variable "Controller" in the input_handler
%% A typical server is written like this:
%% input_handler(Socket, Controller) ->
%% receive
%% {tcp, Socket, Bin} ->
%% ...
%% gen_tcp:send(Socket, ...)
%%
%% {tcp_closed, Socket} ->
%%
%%
%% Any ->
%% ...
%%
%% end.
start_client(Host, Port, Length) ->
gen_tcp:connect(Host, Port,
[binary,
{active, true},
{packet, 2},
{packet_size, Length}], 10000).
%% Note when start_raw_server returns it should be ready to
%% Immediately accept connections
start_raw_server(Port, Fun, Max, Length) ->
Name = port_name(Port),
case whereis(Name) of
undefined ->
Self = self(),
Pid = spawn_link(fun() ->
cold_start(Self, Port, Fun, Max, Length)
end),
receive
{Pid, ok} ->
register(Name, Pid),
{ok, Pid};
{Pid, Error} ->
Error
end;
_Pid ->
{error, already_started}
end.
stop(Port) when integer(Port) ->
Name = port_name(Port),
case whereis(Name) of
undefined ->
not_started;
Pid ->
exit(Pid, kill),
(catch unregister(Name)),
stopped
end.
children(Port) when integer(Port) ->
port_name(Port) ! {children, self()},
receive
{session_server, Reply} -> Reply
end.
port_name(Port) when integer(Port) ->
list_to_atom("portServer" ++ integer_to_list(Port)).
cold_start(Master, Port, Fun, Max, Length) ->
process_flag(trap_exit, true),
io:format("Starting a port server on ~p...~n",[Port]),
case gen_tcp:listen(Port, [binary,
%% {dontroute, true},
{nodelay,true},
{packet_size, Length},
{packet, 2},
{reuseaddr, true},
{active, false}]) of
{ok, Listen} ->
%% io:format("Listening on:~p~n",[Listen]),
Master ! {self(), ok},
New = start_accept(Listen, Fun),
%% Now we're ready to run
socket_loop(Listen, New, [], Fun, Max);
Error ->
Master ! {self(), Error}
end.
%% Don't mess with the following code uless you really know what you're
%% doing (and Thanks to Magnus for heping me get it right)
socket_loop(Listen, New, Active, Fun, Max) ->
receive
{istarted, New} ->
Active1 = [New|Active],
possibly_start_another(false, Listen, Active1, Fun, Max);
{'EXIT', New, _Why} ->
%%io:format("Child exit=~p~n",[Why]),
possibly_start_another(false, Listen, Active, Fun, Max);
{'EXIT', Pid, _Why} ->
%%io:format("Child exit=~p~n",[Why]),
Active1 = lists:delete(Pid, Active),
possibly_start_another(New, Listen, Active1, Fun, Max);
{children, From} ->
From ! {session_server, Active},
socket_loop(Listen, New, Active, Fun, Max);
Other ->
io:format("Here in loop:~p~n",[Other])
end.
possibly_start_another(New, Listen, Active, Fun, Max) when pid(New) ->
socket_loop(Listen, New, Active, Fun, Max);
possibly_start_another(false, Listen, Active, Fun, Max) ->
case length(Active) of
N when N < Max ->
New = start_accept(Listen, Fun),
socket_loop(Listen, New, Active, Fun, Max);
_ ->
error_logger:warning_report(
[{module, ?MODULE},
{line, ?LINE},
{message, "Connections maxed out"},
{maximum, Max},
{connected, length(Active)},
{now, now()}]),
socket_loop(Listen, false, Active, Fun, Max)
end.
start_accept(Listen, Fun) ->
S = self(),
spawn_link(fun() -> start_child(S, Listen, Fun) end).
start_child(Parent, Listen, Fun) ->
case gen_tcp:accept(Listen) of
{ok, Socket} ->
Parent ! {istarted,self()}, % tell the controller
inet:setopts(Socket, [{nodelay,true},
{packet, 2},
{active, true}]), % before we activate socket
%% send an event to kill the connection
%% after a certain time unless
%% a login packet is received.
%%erlang:send_after(?KILL_DELAY, self(), 'KILL'),
Fun(Socket);
_Other ->
exit(oops)
end.
send(Socket, Data) ->
case gen_tcp:send(Socket, Data) of
ok ->
ok;
Error ->
error_logger:error_report([{message, "gen_tcp:send error"},
{module, ?MODULE},
{line, ?LINE},
{socket, Socket},
{port_info, erlang:port_info(Socket,
connected)},
{data, Data},
{error, Error}])
end.
More information about the erlang-questions
mailing list