Browse Source

etcdmain: support keep alive listeners on limit listener connections

Fixes #4171
Anthony Romano 10 years ago
parent
commit
811fbc5672
3 changed files with 83 additions and 9 deletions
  1. 1 2
      etcdmain/etcd.go
  2. 12 7
      pkg/transport/keepalive_listener.go
  3. 70 0
      pkg/transport/limit_listen.go

+ 1 - 2
etcdmain/etcd.go

@@ -31,7 +31,6 @@ import (
 	systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver"
@@ -245,7 +244,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 			if fdLimit <= reservedInternalFDNum {
 				plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
 			}
-			l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
+			l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
 		}
 
 		// Do not wrap around this listener if TLS Info is set.

+ 12 - 7
pkg/transport/keepalive_listener.go

@@ -21,6 +21,11 @@ import (
 	"time"
 )
 
+type keepAliveConn interface {
+	SetKeepAlive(bool) error
+	SetKeepAlivePeriod(d time.Duration) error
+}
+
 // NewKeepAliveListener returns a listener that listens on the given address.
 // Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
 // Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
@@ -50,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
 	if err != nil {
 		return nil, err
 	}
-	tcpc := c.(*net.TCPConn)
+	kac := c.(keepAliveConn)
 	// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
 	// default on linux:  30 + 8 * 30
 	// default on osx:    30 + 8 * 75
-	tcpc.SetKeepAlive(true)
-	tcpc.SetKeepAlivePeriod(30 * time.Second)
-	return tcpc, nil
+	kac.SetKeepAlive(true)
+	kac.SetKeepAlivePeriod(30 * time.Second)
+	return c, nil
 }
 
 // A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
@@ -72,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
 	if err != nil {
 		return
 	}
-	tcpc := c.(*net.TCPConn)
+	kac := c.(keepAliveConn)
 	// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
 	// default on linux:  30 + 8 * 30
 	// default on osx:    30 + 8 * 75
-	tcpc.SetKeepAlive(true)
-	tcpc.SetKeepAlivePeriod(30 * time.Second)
+	kac.SetKeepAlive(true)
+	kac.SetKeepAlivePeriod(30 * time.Second)
 	c = tls.Server(c, l.config)
 	return
 }

+ 70 - 0
pkg/transport/limit_listen.go

@@ -0,0 +1,70 @@
+// Copyright 2013 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package netutil provides network utility functions, complementing the more
+// common ones in the net package.
+package transport
+
+import (
+	"errors"
+	"net"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNotTCP = errors.New("only tcp connections have keepalive")
+)
+
+// LimitListener returns a Listener that accepts at most n simultaneous
+// connections from the provided Listener.
+func LimitListener(l net.Listener, n int) net.Listener {
+	return &limitListener{l, make(chan struct{}, n)}
+}
+
+type limitListener struct {
+	net.Listener
+	sem chan struct{}
+}
+
+func (l *limitListener) acquire() { l.sem <- struct{}{} }
+func (l *limitListener) release() { <-l.sem }
+
+func (l *limitListener) Accept() (net.Conn, error) {
+	l.acquire()
+	c, err := l.Listener.Accept()
+	if err != nil {
+		l.release()
+		return nil, err
+	}
+	return &limitListenerConn{Conn: c, release: l.release}, nil
+}
+
+type limitListenerConn struct {
+	net.Conn
+	releaseOnce sync.Once
+	release     func()
+}
+
+func (l *limitListenerConn) Close() error {
+	err := l.Conn.Close()
+	l.releaseOnce.Do(l.release)
+	return err
+}
+
+func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
+	tcpc, ok := l.Conn.(*net.TCPConn)
+	if !ok {
+		return ErrNotTCP
+	}
+	return tcpc.SetKeepAlive(doKeepAlive)
+}
+
+func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
+	tcpc, ok := l.Conn.(*net.TCPConn)
+	if !ok {
+		return ErrNotTCP
+	}
+	return tcpc.SetKeepAlivePeriod(d)
+}