[erlang-questions] A possible problem with the canonical listener idiom
Chris Newcombe
chris.newcombe@REDACTED
Fri Jan 2 16:31:08 CET 2009
> a race condition in
> the canonical example listener under serious load which can lead to the
> initial packet(s) from an active or once-active socket being delivered to
> the accepting process rather than the handling process.
When I first started using gen_tcp in active mode I was worried about
this too. So I did a little digging and found that
inet:controlling_process contains code that 'fixes' the race
condition. See the code from inet.erl (in R12B-2), pasted below for
convenience.
i.e. inet:controlling_process specifically looks for any packets that
have arrived at the 'wrong' (old owner) process and moves them to the
new owner process. It correctly preserves any semantic-significance
in the order of packet messages that it moves (it moves data packets
in order, and then moves 'closed' messages etc. after them). And it
also ensures that such 'old' packets are delivered before any newer
packets that arrive while controlling_process is moving the old
packets (it temporarily changes the socket to 'active false' mode
while doing the move, and then restores the previous 'active' mode.
Note that controlling_process may only be called by the current owner
of the socket (i.e. the 'acceptor' process), so this mechanism is
guaranteed to have access to the mailbox that might contain 'old'
packets. It seems safe to me.
I wrote a simple test (also pasted below) to verify that 'old' packets
are in fact moved, and their relative order is preserved.
Chris
>From erlang/lib/kernel-2.12.2/src/inet.erl
%% Set controlling process for TCP socket.
tcp_controlling_process(S, NewOwner) when is_port(S), is_pid(NewOwner) ->
case erlang:port_info(S, connected) of
{connected, Pid} when Pid =/= self() ->
{error, not_owner};
undefined ->
{error, einval};
_ ->
case prim_inet:getopt(S, active) of
{ok, A0} ->
prim_inet:setopt(S, active, false),
case tcp_sync_input(S, NewOwner, false) of
true ->
%% %% socket already closed,
ok;
false ->
case catch erlang:port_connect(S, NewOwner) of
true ->
unlink(S), %% unlink from port
prim_inet:setopt(S, active, A0),
ok;
{'EXIT', Reason} ->
{error, Reason}
end
end;
Error ->
Error
end
end.
tcp_sync_input(S, Owner, Flag) ->
receive
{tcp, S, Data} ->
Owner ! {tcp, S, Data},
tcp_sync_input(S, Owner, Flag);
{tcp_closed, S} ->
Owner ! {tcp_closed, S},
tcp_sync_input(S, Owner, true);
{S, {data, Data}} ->
Owner ! {S, {data, Data}},
tcp_sync_input(S, Owner, Flag);
{inet_async, S, Ref, Status} ->
Owner ! {inet_async, S, Ref, Status},
tcp_sync_input(S, Owner, Flag);
{inet_reply, S, Status} ->
Owner ! {inet_reply, S, Status},
tcp_sync_input(S, Owner, Flag)
after 0 ->
Flag
end.
My test program:
% Run with
%%
%% erlc -v -W test_controlling_process.erl && erl -noshell -s
test_controlling_process main
-module(bug_controlling_process).
-export([main/0]).
main() ->
Port = 40000,
_Acceptor = spawn_link(
fun() ->
{ok, ListenSocket}
= gen_tcp:listen(Port,
[{reuseaddr, true},
binary,
{packet, 2},
{active, true},
{exit_on_close, false}]),
acceptor(ListenSocket)
end),
%% Send data to the port and then close the connection
{ok, ClientSocket}
= gen_tcp:connect("localhost", Port, [binary, {packet, 2}]),
gen_tcp:send(ClientSocket, "data1"),
gen_tcp:send(ClientSocket, "data2"),
gen_tcp:shutdown(ClientSocket, write),
%%gen_tcp:close(ClientSocket),
timer:sleep(infinity).
acceptor(ListenSocket) ->
case gen_tcp:accept(ListenSocket) of
{error, Reason} ->
exit({accept_error, Reason});
{ok, Socket} ->
Worker = spawn_link(fun() -> order_sensitive_worker(Socket) end),
%% Increase the probability of the race-condition by
%% waiting for until some data has arrived on the socket
%% before we transfer ownership
timer:sleep(1000),
%% Confirm that we have the expected messages
%% {messages, [{tcp, Socket, <<"data1">>},
%% {tcp, Socket, <<"data2">>},
%% {tcp_closed, Socket}
%% ]}
%% = process_info(self(), messages),
io:format("messages before transfering ownership: ~p~n",
[process_info(self(), messages)]),
%% Now transfer ownership
ok = gen_tcp:controlling_process(Socket, Worker),
timer:sleep(infinity)
end.
order_sensitive_worker(Socket) ->
io:format("order_sensitive_worker waiting for active messages on
socket ~w~n", [Socket]),
order_sensitive_worker_loop().
order_sensitive_worker_loop() ->
receive
Msg ->
io:format("Received ~p~n", [Msg]),
order_sensitive_worker_loop()
end.
2008/12/31 John Haugeland <stonecypher@REDACTED>:
> I'm not really 100% on this, but I think there might be a race condition in
> the canonical example listener under serious load which can lead to the
> initial packet(s) from an active or once-active socket being delivered to
> the accepting process rather than the handling process. I've documented
> what I believe the problem is, and how to fix it (including a standard fix
> in my utility library scutil, under the name standard_listener), here:
>
> http://fullof.bs/a-better-erlang-tcp-listening-pattern-addressingthe-fast-packet-loss-problem
>
> I would appreciate commentary. If I'm wrong, please let me know.
>
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@REDACTED
> http://www.erlang.org/mailman/listinfo/erlang-questions
>
More information about the erlang-questions
mailing list