[erlang-questions] UDP concurrent server
Bogdan Andu
Wed Dec 9 16:46:41 CET 2015
the so called controller is a simple gen_server and is atarted by a
supervisor when application starts
supervisor snippet:
init([]) ->
[{port, Port}] = ets:lookup(config, port),
[{listen, IPv4}] = ets:lookup(config, listen),
MpsConn = {mps_controller,{mps_controller, start_link, [Port, IPv4]},
temporary, 5000, worker, [mps_controller]},
{ok,{{one_for_all, 3, 10}, [MpsConn]}}.
and controller:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
%% ====================================================================
%% API functions
%% ====================================================================
-export([start_link/2, stop/0]).
%% ====================================================================
%% Behavioural functions
%% ====================================================================
%% -record(state, {}).
start_link(Port, Ip) ->
gen_server:start_link(?MODULE, [Port, Ip] []).
stop() ->
gen_server:call(?MODULE, stop).
%% init/1
%% ====================================================================
%% @doc <a href="http://www.erlang.org/doc/man/gen_server.html#Module:init-1
-spec init(Args :: term()) -> Result when
Result :: {ok, State}
| {ok, State, Timeout}
| {ok, State, hibernate}
| {stop, Reason :: term()}
| ignore,
State :: term(),
Timeout :: non_neg_integer() | infinity.
%% ====================================================================
init([Port, Ip]) ->
process_flag(trap_exit, true),
{ok, Sock} = gen_udp:open(Port, [binary,
{active, false},
{reuseaddr, true},
{ip, Ip}
{ok, #udp_conn_state{sock = Sock}, 0}.
%% handle_call/3
%% ====================================================================
%% @doc <a href="
-spec handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State
:: term()) -> Result when
Result :: {reply, Reply, NewState}
| {reply, Reply, NewState, Timeout}
| {reply, Reply, NewState, hibernate}
| {noreply, NewState}
| {noreply, NewState, Timeout}
| {noreply, NewState, hibernate}
| {stop, Reason, Reply, NewState}
| {stop, Reason, NewState},
Reply :: term(),
NewState :: term(),
Timeout :: non_neg_integer() | infinity,
Reason :: term().
%% ====================================================================
handle_call(Request, From, State) ->
Reply = ok,
{reply, Reply, State}.
%% handle_cast/2
%% ====================================================================
%% @doc <a href="
-spec handle_cast(Request :: term(), State :: term()) -> Result when
Result :: {noreply, NewState}
| {noreply, NewState, Timeout}
| {noreply, NewState, hibernate}
| {stop, Reason :: term(), NewState},
NewState :: term(),
Timeout :: non_neg_integer() | infinity.
%% ====================================================================
handle_cast(Msg, State) ->
{noreply, State}.
%% handle_info/2
%% ====================================================================
%% @doc <a href="
-spec handle_info(Info :: timeout | term(), State :: term()) -> Result when
Result :: {noreply, NewState}
| {noreply, NewState, Timeout}
| {noreply, NewState, hibernate}
| {stop, Reason :: term(), NewState},
NewState :: term(),
Timeout :: non_neg_integer() | infinity.
%% ====================================================================
handle_info({udp, Socket, Host, Port, Bin}, State) ->
{noreply, State, 1};
handle_info(timeout, #udp_conn_state{sock = Sock} = State) ->
inet:setopts(Sock, [{active, once}]),
{noreply, State};
handle_info(Info, State) ->
{noreply, State}.
%% terminate/2
%% ====================================================================
%% @doc <a href="
-spec terminate(Reason, State :: term()) -> Any :: term() when
Reason :: normal
| shutdown
| {shutdown, term()}
| term().
%% ====================================================================
terminate(Reason, State) ->
%% code_change/3
%% ====================================================================
%% @doc <a href="
-spec code_change(OldVsn, State :: term(), Extra :: term()) -> Result when
Result :: {ok, NewState :: term()} | {error, Reason :: term()},
OldVsn :: Vsn | {down, Vsn},
Vsn :: term().
%% ====================================================================
code_change(OldVsn, State, Extra) ->
{ok, State}.
On Wed, Dec 9, 2015 at 4:27 PM, Fred Hebert <mononcqc@REDACTED> wrote:
> On 12/09, Bogdan Andu wrote:
>> handle_info({udp, Socket, Host, Port, Bin}, State) ->
>> {noreply, State};
>> In a few minutes the memory allocated to binary increases to ~ 500MB
>> by running the command:
>> fmpeg -f lavfi -i aevalsrc="sin(40*2*PI*t)" -ar 8000 -f mulaw -f rtp
>> rtp://
>> It seems that the Bins are accumulated in the process memory area, an
>> never
>> are deallocated
>> unless the process is killed, which is not an option.
> So there's two ways about that. If the size of each binary is > 64 bits,
> then the binary is moved to a global shared heap, and is only collected
> once no process at all has a reference to it anymore.
> This takes a few forms:
> 1. either the process receiving the data and passing it on holds on to it
> inadvertently by not garbage-collecting
> 2. the processes the messages would be forwarded to are keeping a copy.
> 3. the process isn't keeping up
> In this case, it would be weird for it to be blameable on 2) and 3) since
> the snippet above does not forward data, and because "not keeping up" would
> affect both forms the same (the following one, I mean:)
> If I change the clause to:
>> handle_info({udp1, Socket, Host, Port, Bin}, State) ->
>> So as long as the process receives the packet, this is accumulated in
>> binary memory are
>> and never deallocated.
> So the interesting bit there is are you sure the memory isn't just going
> elsewhere? That you're doing nothing with the process? Not matching on the
> message does not mean the message isn't read. Any receive operation (or
> most of them, as far as I can tell) work by taking the messages in the
> mailbox, copying them to the process heap, potentially running a GC, and
> then getting the message.
> Simply not matching the message does not take it out of the mailbox; in
> fact I would expect bigger leaks with that clause, unless you have a second
> one that broadly matches on all messages and discards them.
> Then the problem would point at being about what you do with the message.
> The thing I would check is:
> a) are you keeping more references than you think in what you do, or is
>> handle_info({udp, Socket, Host, Port, Bin}, State) ->
>> {noreply, State};
> really the full clause?
> Another option can be to return {noreply, State, hibernate} from time to
> time, which will force a full GC and recompaction of the process memory.
> If the memory isn't shed away after that, then you know it's being either
> still referenced by the process, or has another process referencing it.
> Regards,
> Fred.
