Browse Source

integration: use unix domain sockets for all connections

Anthony Romano 9 năm trước cách đây
mục cha
commit
13d0ea7f54
3 tập tin đã thay đổi với 31 bổ sung58 xóa
  1. 3 6
      integration/bridge.go
  2. 25 47
      integration/cluster.go
  3. 3 5
      integration/v2_http_kv_test.go

+ 3 - 6
integration/bridge.go

@@ -18,8 +18,9 @@ import (
 	"fmt"
 	"io"
 	"net"
-	"os"
 	"sync"
+
+	"github.com/coreos/etcd/pkg/transport"
 )
 
 // bridge creates a unix socket bridge to another unix socket, making it possible
@@ -43,10 +44,7 @@ func newBridge(addr string) (*bridge, error) {
 		conns:   make(map[*bridgeConn]struct{}),
 		stopc:   make(chan struct{}, 1),
 	}
-	if err := os.RemoveAll(b.inaddr); err != nil {
-		return nil, err
-	}
-	l, err := net.Listen("unix", b.inaddr)
+	l, err := transport.NewUnixListener(b.inaddr)
 	if err != nil {
 		return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
 	}
@@ -79,7 +77,6 @@ func (b *bridge) Reset() {
 func (b *bridge) serveListen() {
 	defer func() {
 		b.l.Close()
-		os.RemoveAll(b.inaddr)
 		b.mu.Lock()
 		for bc := range b.conns {
 			bc.Close()

+ 25 - 47
integration/cluster.go

@@ -25,7 +25,6 @@ import (
 	"os"
 	"reflect"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -53,14 +52,18 @@ const (
 	tickDuration   = 10 * time.Millisecond
 	clusterName    = "etcd"
 	requestTimeout = 20 * time.Second
+
+	basePort     = 21000
+	urlScheme    = "unix"
+	urlSchemeTLS = "unixs"
 )
 
 var (
 	electionTicks = 10
 
-	// integration test uses well-known ports to listen for each running member,
-	// which ensures restarted member could listen on specific port again.
-	nextListenPort int64 = 21000
+	// integration test uses unique ports, counting up, to listen for each
+	// member, ensuring restarted members can listen on the same port again.
+	localListenCount int64 = 0
 
 	testTLSInfo = transport.TLSInfo{
 		KeyFile:        "./fixtures/server.key.insecure",
@@ -91,6 +94,13 @@ func init() {
 	api.EnableCapability(api.V3rpcCapability)
 }
 
+func schemeFromTLSInfo(tls *transport.TLSInfo) string {
+	if tls == nil {
+		return urlScheme
+	}
+	return urlSchemeTLS
+}
+
 func (c *cluster) fillClusterForMembers() error {
 	if c.cfg.DiscoveryURL != "" {
 		// cluster will be discovered
@@ -99,10 +109,7 @@ func (c *cluster) fillClusterForMembers() error {
 
 	addrs := make([]string, 0)
 	for _, m := range c.Members {
-		scheme := "http"
-		if m.PeerTLSInfo != nil {
-			scheme = "https"
-		}
+		scheme := schemeFromTLSInfo(m.PeerTLSInfo)
 		for _, l := range m.PeerListeners {
 			addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
 		}
@@ -186,13 +193,8 @@ func (c *cluster) URLs() []string {
 func (c *cluster) HTTPMembers() []client.Member {
 	ms := []client.Member{}
 	for _, m := range c.Members {
-		pScheme, cScheme := "http", "http"
-		if m.PeerTLSInfo != nil {
-			pScheme = "https"
-		}
-		if m.ClientTLSInfo != nil {
-			cScheme = "https"
-		}
+		pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
+		cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
 		cm := client.Member{Name: m.Name}
 		for _, ln := range m.PeerListeners {
 			cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
@@ -225,10 +227,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
 func (c *cluster) addMember(t *testing.T) {
 	m := c.mustNewMember(t)
 
-	scheme := "http"
-	if c.cfg.PeerTLS != nil {
-		scheme = "https"
-	}
+	scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
 
 	// send add request to the cluster
 	var err error
@@ -390,26 +389,13 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
 }
 
 func newLocalListener(t *testing.T) net.Listener {
-	port := atomic.AddInt64(&nextListenPort, 1)
-	l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
-	if err != nil {
-		t.Fatal(err)
-	}
-	return l
+	c := atomic.AddInt64(&localListenCount, 1)
+	addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
+	return newListenerWithAddr(t, addr)
 }
 
 func newListenerWithAddr(t *testing.T, addr string) net.Listener {
-	var err error
-	var l net.Listener
-	// TODO: we want to reuse a previous closed port immediately.
-	// a better way is to set SO_REUSExx instead of doing retry.
-	for i := 0; i < 5; i++ {
-		l, err = net.Listen("tcp", addr)
-		if err == nil {
-			break
-		}
-		time.Sleep(500 * time.Millisecond)
-	}
+	l, err := transport.NewUnixListener(addr)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -449,13 +435,8 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
 	var err error
 	m := &member{}
 
-	peerScheme, clientScheme := "http", "http"
-	if mcfg.peerTLS != nil {
-		peerScheme = "https"
-	}
-	if mcfg.clientTLS != nil {
-		clientScheme = "https"
-	}
+	peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
+	clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
 
 	pln := newLocalListener(t)
 	m.PeerListeners = []net.Listener{pln}
@@ -500,10 +481,7 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
 func (m *member) listenGRPC() error {
 	// prefix with localhost so cert has right domain
 	m.grpcAddr = "localhost:" + m.Name + ".sock"
-	if err := os.RemoveAll(m.grpcAddr); err != nil {
-		return err
-	}
-	l, err := net.Listen("unix", m.grpcAddr)
+	l, err := transport.NewUnixListener(m.grpcAddr)
 	if err != nil {
 		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
 	}

+ 3 - 5
integration/v2_http_kv_test.go

@@ -19,7 +19,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"net"
 	"net/http"
 	"net/url"
 	"reflect"
@@ -28,6 +27,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/pkg/capnslog"
 )
 
@@ -1038,10 +1038,8 @@ type testHttpClient struct {
 
 // Creates a new HTTP client with KeepAlive disabled.
 func NewTestClient() *testHttpClient {
-	tr := &http.Transport{
-		Dial:              (&net.Dialer{Timeout: time.Second}).Dial,
-		DisableKeepAlives: true,
-	}
+	tr, _ := transport.NewTransport(transport.TLSInfo{}, time.Second)
+	tr.DisableKeepAlives = true
 	return &testHttpClient{&http.Client{Transport: tr}}
 }