[erlang-questions] how to flush a gen_tcp socket on close?

Per Hedeland <>
Fri Apr 6 15:23:05 CEST 2012


Tony Rogvall <> wrote:
>
>First of all the right way: send, shutdown wait for close is not working here as would have been 
>my first answer :-) 

Hm, why do you (too) bring shutdown into this discussion? Surely you are
not suggesting that it should be a requirement to call
gen_tcp:shutdown/2 before gen_tcp:close/1 in order to get buffered data
delivered? The gen_tcp interface is a very nice simplification of the
low-level socket API - why should it be *more* complex than the
low-level API in this particular case?

I will maintain that the only reason to use shutdown is when you want to
do a "half close", or even only when you want to do a "half close in the
send direction". What's the point otherwise?

>Then I moved the subscription code to just wait for the empty queue event, and NOT do the timeout. 
>This worked well.

But will potentially block forever.

> My interpretation is that the reader is so slow, so that the write side 
>"write ready" is not signaled and no more data goes down to kernel space (probably some threshold.)
>This will keep the pending data at the same level, and hence timeout.

Agreed.

>So how would a workaround look like ? 

IMO the *fix* is to not have a user-level queue at all, and thus always
have gen_tcp:send/2 block until everything has been written to the
socket. I don't expect this fix to happen though, but it should at least
be possible to disable the user-level queue, by passing options
{high_watermark, 1}, {low_watermark, 0}. However this doesn't work the
way the code is currently written - send() will never block if the queue
is empty before the call. The patch below fixes this, and with it I can
run Matt's test successfully (after fixing some "bad" assumptions) if I
use the watermark-setting.

>One way is to make sure that the kernel buffer is smaller (now I can send 160K). Use what ever size that
>make sense with the knowledge of the inet built in time out timer (5 secs)
>
>An other way to fix this could be to try to add kernel buffer space to send_pend but that is 
>pretty os dependent :-)

Both of these suffer from the effect of "silly window avoidance" - i.e.
even if they improve the coupling "more data is sent" => "send queue
shrinks", they do not help with the coupling "more data is read" =>
"more data is sent". And they don't at all address the "keep trying as
long as the receiver is alive" goal.

--Per


--- otp_src_R15B01/erts/emulator/drivers/common/inet_drv.c.ORIG	2012-04-01 20:15:00.000000000 +0200
+++ otp_src_R15B01/erts/emulator/drivers/common/inet_drv.c	2012-04-06 13:18:59.000000000 +0200
@@ -9396,6 +9396,19 @@
     return -1;
 }
 
+static void tcp_set_busy(tcp_descriptor* desc)
+{
+    DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
+            (long)desc->inet.port, desc->inet.s));
+    desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
+    desc->inet.busy_caller = desc->inet.caller;
+    set_busy_port(desc->inet.port, 1);
+    if (desc->send_timeout != INET_INFINITY) {
+        desc->busy_on_send = 1;
+        driver_set_timer(desc->inet.port, desc->send_timeout);
+    }
+}
+
 /*
 ** Send non-blocking vector data
 */
@@ -9439,15 +9452,7 @@
     if ((sz = driver_sizeq(ix)) > 0) {
 	driver_enqv(ix, ev, 0);
 	if (sz+ev->size >= desc->high) {
-	    DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
-		    (long)desc->inet.port, desc->inet.s));
-	    desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
-	    desc->inet.busy_caller = desc->inet.caller;
-	    set_busy_port(desc->inet.port, 1);
-	    if (desc->send_timeout != INET_INFINITY) {
-		desc->busy_on_send = 1;
-		driver_set_timer(desc->inet.port, desc->send_timeout);
-	    }
+            tcp_set_busy(desc);
 	    return 1;
 	}
     }
@@ -9490,8 +9495,13 @@
 	DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", 
 		(long)desc->inet.port, desc->inet.s));
 	driver_enqv(ix, ev, n); 
-	if (!INETP(desc)->is_ignored)
+	if (!INETP(desc)->is_ignored) {
 	    sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
+            if (driver_sizeq(ix) >= desc->high) {
+                tcp_set_busy(desc);
+                return 1;
+            }
+        }
     }
     return 0;
 }
@@ -9536,15 +9546,7 @@
 	    driver_enq(ix, buf, h_len);
 	driver_enq(ix, ptr, len);
 	if (sz+h_len+len >= desc->high) {
-	    DEBUGF(("tcp_send(%ld): s=%d, sender forced busy\r\n",
-		    (long)desc->inet.port, desc->inet.s));
-	    desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
-	    desc->inet.busy_caller = desc->inet.caller;
-	    set_busy_port(desc->inet.port, 1);
-	    if (desc->send_timeout != INET_INFINITY) {
-		desc->busy_on_send = 1;
-		driver_set_timer(desc->inet.port, desc->send_timeout);
-	    }
+            tcp_set_busy(desc);
 	    return 1;
 	}
     }
@@ -9590,8 +9592,13 @@
 	    n -= h_len;
 	    driver_enq(ix, ptr+n, len-n);
 	}
-	if (!INETP(desc)->is_ignored)
+	if (!INETP(desc)->is_ignored) {
 	    sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
+            if (driver_sizeq(ix) >= desc->high) {
+                tcp_set_busy(desc);
+                return 1;
+            }
+        }
     }
     return 0;
 }



More information about the erlang-questions mailing list