TCP stack throughput

Joel Reymont <>
Mon Jul 4 15:37:49 CEST 2005


The files are not available for some reason so I'm pasting them here  
just in case...

-module(fac).
-behaviour(gen_server).

-export([init/1, handle_call/3, handle_cast/2,
      handle_info/2, terminate/2, code_change/3]).
-export([start/1, stop/1]).
-export([fac/4]).

-record(server, {
       self,
       port,
       players,
       pending
      }).

new(Port) ->
     #server {
      self = self(),
      port = Port
     }.

start(Port) ->
     gen_server:start(fac, [Port], []).

init([Port]) ->
     process_flag(trap_exit, true),
     Server = new(Port),
     tcp_server:start_raw_server(Port,
                 fun(Sock) -> handler(Sock, Server) end,
                 50000, 2048),
     {ok, Server}.

stop(Server) ->
     gen_server:cast(Server, stop).

terminate(normal, Server) ->
     tcp_server:stop(Server#server.port),
     ok.

handle_cast(stop, Server) ->
     {stop, normal, Server};

handle_cast(Event, Server) ->
     error_logger:info_report([{module, ?MODULE},
                   {line, ?LINE},
                   {self, self()},
                   {message, Event}]),
     {noreply, Server}.

handle_call(Event, From, Server) ->
     error_logger:info_report([{module, ?MODULE},
                   {line, ?LINE},
                   {self, self()},
                   {message, Event},
                   {from, From}]),
     {noreply, Server}.

handle_info({'EXIT', _Pid, _Reason}, Server) ->
     %% child exit?
     {noreply, Server};

handle_info(Info, Server) ->
     error_logger:info_report([{module, ?MODULE},
                   {line, ?LINE},
                   {self, self()},
                   {message, Info}]),
     {noreply, Server}.

code_change(_OldVsn, Server, _Extra) ->
     {ok, Server}.

%%%
%%% Implementation
%%%

fac(N) when N > 0 ->
      N * fac(N - 1);
fac(0) ->
     1.

fac(_Host, _Port, _N, 0) ->
     ok;

fac(Host, Port, N, Max) ->
     Fun = fun() ->
           case tcp_server:start_client(Host, Port, 2048) of
               {ok, Sock} ->
               case gen_tcp:send(Sock, <<Max:32, N:32>>) of
                   ok ->
                   receive
                       {tcp, Sock, <<Fac:2048>>} ->
                       io:format("#~w: fac(~w) = ~w~n",
                             [Max, N, Fac]);
                       {error, closed} ->
                       io:format("#~w: connection closed~n",
                             [Max]);
                       Any ->
                       io:format("#~w: receive: ~w~n",
                             [Max, Any])
                   end,
                   gen_tcp:close(Sock);
                   Any ->
                   io:format("#~w: gen_tcp: ~w~n", [Max, Any])
               end;
               Any ->
               io:format("#~w: start_client: ~w~n", [Max, Any])
           end
       end,
     spawn(Fun),
     fac(Host, Port, N, Max - 1).

handler(Socket, Parent) ->
     receive
     {tcp, Socket, <<Iter:32, N:32>>} ->
         io:format("#~w: Calculating factorial(~w)~n", [Iter, N]),
         Fac = fac(N),
         gen_tcp:send(Socket, <<Fac:2048>>),
         handler(Socket, Parent);
     {tcp_closed, Socket} ->
         %%io:format("Socket ~w closed~n", [Socket]);
         ok;
     Any ->
         io:format("Got ~w in the handler~n", [Any]),
         gen_tcp:send(Socket, <<0:2048>>)
     end.

-----------------------
%% Copyright (C) 2002, Joe Armstrong
%% File    : tcp_server.erl
%% Author  : Joe Armstrong ()
%% Purpose : Keeps track of a number of TCP sessions
%% Last modified: 2002-11-17

-module(tcp_server).

-export([start_raw_server/4, start_client/3,
      stop/1, children/1, send/2]).

-define(KILL_DELAY, 1000).

%% -export([start_child/3]).

%% start_raw_server(Port, Fun, Max)
%%   This server accepts up to Max connections on Port
%%   The *first* time a connection is made to Port
%%   Then Fun(Socket) is called.
%%   Thereafter messages to the socket result in messsages to the  
handler.

%% a typical server is usually written like this:

%% To setup a lister

%% start_server(Port) ->
%%     S = self(),
%%     process_flag(trap_exit, true),
%%     tcp_server:start_raw_server(Port,
%%                 fun(Socket) -> input_handler(Socket, S) end,
%%                 15,
%%                              0)
%%     loop().

%% The loop() process is a central controller that all
%% processes can use to synchronize amongst themselfves if necessary
%% It ends up as the variable "Controller" in the input_handler

%% A typical server is written like this:

%% input_handler(Socket, Controller) ->
%%     receive
%%     {tcp, Socket, Bin} ->
%%         ...
%%         gen_tcp:send(Socket, ...)
%%
%%     {tcp_closed, Socket} ->
%%
%%
%%     Any ->
%%         ...
%%
%%     end.

start_client(Host, Port, Length) ->
      gen_tcp:connect(Host, Port,
              [binary,
               {active, true},
               {packet, 2},
               {packet_size, Length}], 10000).

%% Note when start_raw_server returns it should be ready to
%% Immediately accept connections

start_raw_server(Port, Fun, Max, Length) ->
     Name = port_name(Port),
     case whereis(Name) of
     undefined ->
         Self = self(),
         Pid = spawn_link(fun() ->
                      cold_start(Self, Port, Fun, Max, Length)
                  end),
         receive
         {Pid, ok} ->
             register(Name, Pid),
             {ok, Pid};
         {Pid, Error} ->
             Error
         end;
     _Pid ->
         {error, already_started}
     end.

stop(Port) when integer(Port) ->
     Name = port_name(Port),
     case whereis(Name) of
     undefined ->
         not_started;
     Pid ->
         exit(Pid, kill),
         (catch unregister(Name)),
         stopped
     end.

children(Port) when integer(Port) ->
     port_name(Port) ! {children, self()},
     receive
     {session_server, Reply} -> Reply
     end.

port_name(Port) when integer(Port) ->
     list_to_atom("portServer" ++ integer_to_list(Port)).

cold_start(Master, Port, Fun, Max, Length) ->
     process_flag(trap_exit, true),
     io:format("Starting a port server on ~p...~n",[Port]),
     case gen_tcp:listen(Port, [binary,
                    %% {dontroute, true},
                    {nodelay,true},
                    {packet_size, Length},
                    {packet, 2},
                    {reuseaddr, true},
                    {active, false}]) of
     {ok, Listen} ->
         %% io:format("Listening on:~p~n",[Listen]),
         Master ! {self(), ok},
         New = start_accept(Listen, Fun),
         %% Now we're ready to run
         socket_loop(Listen, New, [], Fun, Max);
     Error ->
         Master ! {self(), Error}
     end.

%% Don't mess with the following code uless you really know what you're
%% doing (and Thanks to Magnus for heping me get it right)

socket_loop(Listen, New, Active, Fun, Max) ->
     receive
     {istarted, New} ->
         Active1 = [New|Active],
         possibly_start_another(false, Listen, Active1, Fun, Max);
     {'EXIT', New, _Why} ->
         %%io:format("Child exit=~p~n",[Why]),
         possibly_start_another(false, Listen, Active, Fun, Max);
     {'EXIT', Pid, _Why} ->
         %%io:format("Child exit=~p~n",[Why]),
         Active1 = lists:delete(Pid, Active),
         possibly_start_another(New, Listen, Active1, Fun, Max);
     {children, From} ->
         From ! {session_server, Active},
         socket_loop(Listen, New, Active, Fun, Max);
     Other ->
         io:format("Here in loop:~p~n",[Other])
     end.

possibly_start_another(New, Listen, Active, Fun, Max) when pid(New) ->
     socket_loop(Listen, New, Active, Fun, Max);
possibly_start_another(false, Listen, Active, Fun, Max) ->
     case length(Active) of
     N when N < Max ->
         New = start_accept(Listen, Fun),
         socket_loop(Listen, New, Active, Fun, Max);
     _ ->
         error_logger:warning_report(
           [{module, ?MODULE},
            {line, ?LINE},
            {message, "Connections maxed out"},
            {maximum, Max},
            {connected, length(Active)},
            {now, now()}]),
         socket_loop(Listen, false, Active, Fun, Max)
     end.

start_accept(Listen, Fun) ->
     S = self(),
     spawn_link(fun() -> start_child(S, Listen, Fun) end).

start_child(Parent, Listen, Fun) ->
     case gen_tcp:accept(Listen) of
     {ok, Socket} ->
         Parent ! {istarted,self()},            % tell the controller
         inet:setopts(Socket, [{nodelay,true},
                   {packet, 2},
                   {active, true}]), % before we activate socket
         %% send an event to kill the connection
         %% after a certain time unless
         %% a login packet is received.
         %%erlang:send_after(?KILL_DELAY, self(), 'KILL'),
         Fun(Socket);
     _Other ->
         exit(oops)
     end.

send(Socket, Data) ->
     case gen_tcp:send(Socket, Data) of
     ok ->
         ok;
     Error ->
         error_logger:error_report([{message, "gen_tcp:send error"},
                        {module, ?MODULE},
                        {line, ?LINE},
                        {socket, Socket},
                        {port_info, erlang:port_info(Socket,  
connected)},
                        {data, Data},
                        {error, Error}])
     end.






More information about the erlang-questions mailing list