[erlang-questions] how to flush a gen_tcp socket on close?

Matthias Lang matthias@REDACTED
Thu Apr 5 00:13:34 CEST 2012


On Tuesday, April 03, Andreas Schultz wrote:

> In my case the receiver is to slow to process all the data, sender
> does 10k packets of 1k size, the receiver only gets the first 2000
> packets. It might well be that I hit the close timeout and inet
> discards the rest of the send queue.

Ok, that's something to go on.

I had a go at reproducing what you're doing based on that description.
I see something unexpected which seems similar to what you reported.

The output of my program (at the bottom of this mail) running on R15B is:

   30> as_tcp:go().
   calling close on TX at {23,35,35}
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   read 1000 octets
   close returned at {23,35,45}, 10002ms later
   ok
   read 1000 octets
   31>
   =ERROR REPORT==== 4-Apr-2012::23:35:45 ===
   Error in process <0.120.0> with exit value: {{badmatch,{error,closed}},[{as_tcp,slow_read,1,[{file,"as_tcp.erl"},{line,31}]}]}

I had a bit of a dig in prim_inet.erl. It sounds like you've looked
there too. That code looks like it's intended to loop 'forever' trying
to send the queued data, as long as some progress is made every so often
(always sends at least something in every 5s timeout period). But running
my program suggests that isn't happening as intended.

Instrumenting prim_inet, it looks like the {subs_empty_out_q,0 } message
is coming up from inet_drv.c even though the queue isn't empty. Looking
in there, I noticed this:

   static void tcp_inet_flush(ErlDrvData e)
   {
       tcp_descriptor* desc = (tcp_descriptor*)e;
       if (!(desc->inet.event_mask & FD_WRITE)) {
   	/* Discard send queue to avoid hanging port (OTP-7615) */
   	tcp_clear_output(desc);
       }
   }

but I think that change was introduced sometime after R11B-5, and
R11B-5 fails the same way. So probably a red herring.

Out of time looking at this for now.

> The fix should be simple, limit the send queue size.

To what?

Zero seems to be the only value that will work even for arbitrarily slow
clients. And that defeats the point of having a send queue.

---

A likely _workaround_ is to call

  inet:getstat(Tx, [send_pend])

if the answer is zero, then you know that you can call close() without
_erlang_ tossing data. I haven't tried this.

---

It's late, I might have outsmarted myself, but my current feeling is
that erlang is quietly tossing data and it shouldn't be.

Waiting for as long as it takes in close() seems like the right thing,
though Per might disagree. Waiting for N seconds in close() and then
returning an error if the queue didn't empty would also be better than
just quietly tossing it.

(And: yes, I know, application-level ACKs would avoid this
problem. But I'm not quite ready to say that this problem can't be
fixed.)

Matt

----------------------------------------------------------------------
%% Throwaway module: attempt to reproduce TCP problem reported to
%% erlang-questions 2012-04-02 by Andreas Schultz.
%%
%% He elaborated on 2012-04-04: gen_tcp drops data when close is called
%% with data buffered if the receiver is sufficiently slow.
-module(as_tcp).
-export([go/0]).

go() ->
    {ok, L} = gen_tcp:listen(0, [{active, false}, binary]),
    {ok, Portno} = inet:port(L),
    {ok, Tx} = gen_tcp:connect(localhost, Portno, []),
    {ok, Rx} = gen_tcp:accept(L),
    ok = gen_tcp:close(L),

    One_hundred_k = list_to_binary(lists:duplicate(100000, 0)),
    ok = gen_tcp:send(Tx, [lists:duplicate(40, One_hundred_k), "end of data"]),

    spawn(fun() -> slow_read(Rx) end),

    io:fwrite("calling close on TX at ~p\n", [time()]),
    Before = now(),
    ok = gen_tcp:close(Tx),
    After = now(),
    io:fwrite("close returned at ~p, ~pms later\n",
	      [time(), timer:now_diff(After, Before) div 1000]),

    ok = gen_tcp:close(Rx).

slow_read(Rx) ->
    {ok, _Bin} = gen_tcp:recv(Rx, 10000),
    timer:sleep(1000),
    io:fwrite("read 1000 octets\n"),
    slow_read(Rx).




More information about the erlang-questions mailing list