Browse Source

Merge pull request #4174 from heyitsanthony/fix-limit-keepalive

etcdmain: support keep alive listeners on limit listener connections
Anthony Romano 10 years ago
parent
commit
f45a8fe623

+ 0 - 4
Godeps/Godeps.json

@@ -167,10 +167,6 @@
 			"ImportPath": "golang.org/x/net/context",
 			"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
 		},
-		{
-			"ImportPath": "golang.org/x/net/netutil",
-			"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
-		},
 		{
 			"ImportPath": "golang.org/x/oauth2",
 			"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"

+ 0 - 103
Godeps/_workspace/src/golang.org/x/net/netutil/listen_test.go

@@ -1,103 +0,0 @@
-// Copyright 2013 The Go Authors.  All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build go1.3
-
-// (We only run this test on Go 1.3 because the HTTP client timeout behavior
-// was bad in previous releases, causing occasional deadlocks.)
-
-package netutil
-
-import (
-	"errors"
-	"fmt"
-	"io"
-	"io/ioutil"
-	"net"
-	"net/http"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
-)
-
-func TestLimitListener(t *testing.T) {
-	const (
-		max = 5
-		num = 200
-	)
-
-	l, err := net.Listen("tcp", "127.0.0.1:0")
-	if err != nil {
-		t.Fatalf("Listen: %v", err)
-	}
-	defer l.Close()
-	l = LimitListener(l, max)
-
-	var open int32
-	go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		if n := atomic.AddInt32(&open, 1); n > max {
-			t.Errorf("%d open connections, want <= %d", n, max)
-		}
-		defer atomic.AddInt32(&open, -1)
-		time.Sleep(10 * time.Millisecond)
-		fmt.Fprint(w, "some body")
-	}))
-
-	var wg sync.WaitGroup
-	var failed int32
-	for i := 0; i < num; i++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			c := http.Client{Timeout: 3 * time.Second}
-			r, err := c.Get("http://" + l.Addr().String())
-			if err != nil {
-				t.Logf("Get: %v", err)
-				atomic.AddInt32(&failed, 1)
-				return
-			}
-			defer r.Body.Close()
-			io.Copy(ioutil.Discard, r.Body)
-		}()
-	}
-	wg.Wait()
-
-	// We expect some Gets to fail as the kernel's accept queue is filled,
-	// but most should succeed.
-	if failed >= num/2 {
-		t.Errorf("too many Gets failed: %v", failed)
-	}
-}
-
-type errorListener struct {
-	net.Listener
-}
-
-func (errorListener) Accept() (net.Conn, error) {
-	return nil, errFake
-}
-
-var errFake = errors.New("fake error from errorListener")
-
-// This used to hang.
-func TestLimitListenerError(t *testing.T) {
-	donec := make(chan bool, 1)
-	go func() {
-		const n = 2
-		ll := LimitListener(errorListener{}, n)
-		for i := 0; i < n+1; i++ {
-			_, err := ll.Accept()
-			if err != errFake {
-				t.Fatalf("Accept error = %v; want errFake", err)
-			}
-		}
-		donec <- true
-	}()
-	select {
-	case <-donec:
-	case <-time.After(5 * time.Second):
-		t.Fatal("timeout. deadlock?")
-	}
-}

+ 1 - 2
etcdmain/etcd.go

@@ -31,7 +31,6 @@ import (
 	systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver"
@@ -245,7 +244,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 			if fdLimit <= reservedInternalFDNum {
 				plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
 			}
-			l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
+			l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
 		}
 
 		// Do not wrap around this listener if TLS Info is set.

+ 12 - 7
pkg/transport/keepalive_listener.go

@@ -21,6 +21,11 @@ import (
 	"time"
 )
 
+type keepAliveConn interface {
+	SetKeepAlive(bool) error
+	SetKeepAlivePeriod(d time.Duration) error
+}
+
 // NewKeepAliveListener returns a listener that listens on the given address.
 // Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
 // Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
@@ -50,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
 	if err != nil {
 		return nil, err
 	}
-	tcpc := c.(*net.TCPConn)
+	kac := c.(keepAliveConn)
 	// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
 	// default on linux:  30 + 8 * 30
 	// default on osx:    30 + 8 * 75
-	tcpc.SetKeepAlive(true)
-	tcpc.SetKeepAlivePeriod(30 * time.Second)
-	return tcpc, nil
+	kac.SetKeepAlive(true)
+	kac.SetKeepAlivePeriod(30 * time.Second)
+	return c, nil
 }
 
 // A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
@@ -72,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
 	if err != nil {
 		return
 	}
-	tcpc := c.(*net.TCPConn)
+	kac := c.(keepAliveConn)
 	// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
 	// default on linux:  30 + 8 * 30
 	// default on osx:    30 + 8 * 75
-	tcpc.SetKeepAlive(true)
-	tcpc.SetKeepAlivePeriod(30 * time.Second)
+	kac.SetKeepAlive(true)
+	kac.SetKeepAlivePeriod(30 * time.Second)
 	c = tls.Server(c, l.config)
 	return
 }

+ 23 - 1
Godeps/_workspace/src/golang.org/x/net/netutil/listen.go → pkg/transport/limit_listen.go

@@ -4,11 +4,17 @@
 
 // Package netutil provides network utility functions, complementing the more
 // common ones in the net package.
-package netutil
+package transport
 
 import (
+	"errors"
 	"net"
 	"sync"
+	"time"
+)
+
+var (
+	ErrNotTCP = errors.New("only tcp connections have keepalive")
 )
 
 // LimitListener returns a Listener that accepts at most n simultaneous
@@ -46,3 +52,19 @@ func (l *limitListenerConn) Close() error {
 	l.releaseOnce.Do(l.release)
 	return err
 }
+
+func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
+	tcpc, ok := l.Conn.(*net.TCPConn)
+	if !ok {
+		return ErrNotTCP
+	}
+	return tcpc.SetKeepAlive(doKeepAlive)
+}
+
+func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
+	tcpc, ok := l.Conn.(*net.TCPConn)
+	if !ok {
+		return ErrNotTCP
+	}
+	return tcpc.SetKeepAlivePeriod(d)
+}