[erlang-questions] UDP concurrent server

Bogdan Andu bog495@REDACTED
Wed Dec 9 16:46:41 CET 2015


the so called controller is a simple gen_server and is atarted by a
supervisor when application starts

supervisor snippet:
init([]) ->
    [{port, Port}] = ets:lookup(config, port),
    [{listen, IPv4}] = ets:lookup(config, listen),

    MpsConn = {mps_controller,{mps_controller, start_link, [Port, IPv4]},
             temporary, 5000, worker, [mps_controller]},

     {ok,{{one_for_all, 3, 10}, [MpsConn]}}.


and controller:

-module(mps_controller).
-behaviour(gen_server).

-include("../include/mps.hrl").

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

%% ====================================================================
%% API functions
%% ====================================================================
-export([start_link/2, stop/0]).

%% ====================================================================
%% Behavioural functions
%% ====================================================================
%% -record(state, {}).

start_link(Port, Ip) ->
       gen_server:start_link(?MODULE, [Port, Ip] []).

stop() ->
       gen_server:call(?MODULE, stop).

%% init/1
%% ====================================================================
%% @doc <a href="http://www.erlang.org/doc/man/gen_server.html#Module:init-1
">gen_server:init/1</a>
-spec init(Args :: term()) -> Result when
    Result :: {ok, State}
            | {ok, State, Timeout}
            | {ok, State, hibernate}
            | {stop, Reason :: term()}
            | ignore,
    State :: term(),
    Timeout :: non_neg_integer() | infinity.
%% ====================================================================
init([Port, Ip]) ->
        process_flag(trap_exit, true),

        {ok, Sock} = gen_udp:open(Port, [binary,
                            {active, false},
                             {reuseaddr, true},
                            {ip, Ip}
        ]),

        {ok, #udp_conn_state{sock = Sock}, 0}.

%% handle_call/3
%% ====================================================================
%% @doc <a href="
http://www.erlang.org/doc/man/gen_server.html#Module:handle_call-3
">gen_server:handle_call/3</a>
-spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State
:: term()) -> Result when
    Result :: {reply, Reply, NewState}
            | {reply, Reply, NewState, Timeout}
            | {reply, Reply, NewState, hibernate}
            | {noreply, NewState}
            | {noreply, NewState, Timeout}
            | {noreply, NewState, hibernate}
            | {stop, Reason, Reply, NewState}
            | {stop, Reason, NewState},
    Reply :: term(),
    NewState :: term(),
    Timeout :: non_neg_integer() | infinity,
    Reason :: term().
%% ====================================================================
handle_call(Request, From, State) ->
    Reply = ok,
    {reply, Reply, State}.


%% handle_cast/2
%% ====================================================================
%% @doc <a href="
http://www.erlang.org/doc/man/gen_server.html#Module:handle_cast-2
">gen_server:handle_cast/2</a>
-spec handle_cast(Request :: term(), State :: term()) -> Result when
    Result :: {noreply, NewState}
            | {noreply, NewState, Timeout}
            | {noreply, NewState, hibernate}
            | {stop, Reason :: term(), NewState},
    NewState :: term(),
    Timeout :: non_neg_integer() | infinity.
%% ====================================================================
handle_cast(Msg, State) ->
    {noreply, State}.

%% handle_info/2
%% ====================================================================
%% @doc <a href="
http://www.erlang.org/doc/man/gen_server.html#Module:handle_info-2
">gen_server:handle_info/2</a>
-spec handle_info(Info :: timeout | term(), State :: term()) -> Result when
    Result :: {noreply, NewState}
            | {noreply, NewState, Timeout}
            | {noreply, NewState, hibernate}
            | {stop, Reason :: term(), NewState},
    NewState :: term(),
    Timeout :: non_neg_integer() | infinity.
%% ====================================================================

handle_info({udp, Socket, Host, Port, Bin}, State) ->
     {noreply, State, 1};

handle_info(timeout, #udp_conn_state{sock = Sock} = State) ->
    inet:setopts(Sock, [{active, once}]),
    {noreply, State};

handle_info(Info, State) ->
    {noreply, State}.

%% terminate/2
%% ====================================================================
%% @doc <a href="
http://www.erlang.org/doc/man/gen_server.html#Module:terminate-2
">gen_server:terminate/2</a>
-spec terminate(Reason, State :: term()) -> Any :: term() when
    Reason :: normal
            | shutdown
            | {shutdown, term()}
            | term().
%% ====================================================================
terminate(Reason, State) ->
    ok.


%% code_change/3
%% ====================================================================
%% @doc <a href="
http://www.erlang.org/doc/man/gen_server.html#Module:code_change-3
">gen_server:code_change/3</a>
-spec code_change(OldVsn, State :: term(), Extra :: term()) -> Result when
    Result :: {ok, NewState :: term()} | {error, Reason :: term()},
    OldVsn :: Vsn | {down, Vsn},
    Vsn :: term().
%% ====================================================================
code_change(OldVsn, State, Extra) ->
    {ok, State}.




On Wed, Dec 9, 2015 at 4:27 PM, Fred Hebert <mononcqc@REDACTED> wrote:

> On 12/09, Bogdan Andu wrote:
>
>> handle_info({udp, Socket, Host, Port, Bin}, State)  ->
>>    {noreply, State};
>>
>> In a few minutes the memory allocated to binary increases to ~ 500MB
>> by running the command:
>>
>> fmpeg -f lavfi -i aevalsrc="sin(40*2*PI*t)" -ar 8000 -f mulaw -f rtp
>> rtp://
>> 10.10.13.104:5004
>>
>> It seems that the Bins are accumulated in the process memory area, an
>> never
>> are deallocated
>> unless the process is killed, which is not an option.
>>
>
> So there's two ways about that. If the size of each binary is > 64 bits,
> then the binary is moved to a global shared heap, and is only collected
> once no process at all has a reference to it anymore.
>
> This takes a few forms:
>
> 1. either the process receiving the data and passing it on holds on to it
> inadvertently by not garbage-collecting
> 2. the processes the messages would be forwarded to are keeping a copy.
> 3. the process isn't keeping up
>
> In this case, it would be weird for it to be blameable on 2) and 3) since
> the snippet above does not forward data, and because "not keeping up" would
> affect both forms the same (the following one, I mean:)
>
> If I change the clause to:
>> handle_info({udp1, Socket, Host, Port, Bin}, State) ->
>>
>> So as long as the process receives the packet, this is accumulated in
>> binary memory are
>> and never deallocated.
>>
>
> So the interesting bit there is are you sure the memory isn't just going
> elsewhere? That you're doing nothing with the process? Not matching on the
> message does not mean the message isn't read. Any receive operation (or
> most of them, as far as I can tell) work by taking the messages in the
> mailbox, copying them to the process heap, potentially running a GC, and
> then getting the message.
>
> Simply not matching the message does not take it out of the mailbox; in
> fact I would expect bigger leaks with that clause, unless you have a second
> one that broadly matches on all messages and discards them.
>
> Then the problem would point at being about what you do with the message.
>
> The thing I would check is:
> a) are you keeping more references than you think in what you do, or is
>
>> handle_info({udp, Socket, Host, Port, Bin}, State)  ->
>>    {noreply, State};
>>
>
> really the full clause?
>
> Another option can be to return {noreply, State, hibernate} from time to
> time, which will force a full GC and recompaction of the process memory.
> If the memory isn't shed away after that, then you know it's being either
> still referenced by the process, or has another process referencing it.
>
> Regards,
> Fred.
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20151209/d05afd42/attachment.htm>


More information about the erlang-questions mailing list