Prechádzať zdrojové kódy

integration: bridge connections to grpc server

Tests need to disconnect the network connection for the client to check
reconnection paths but closing a grpc connection closes the logical connection.
To disconnect the client, instead have a bridge between the server and
the client which can monitor and reset connections.
Anthony Romano 9 rokov pred
rodič
commit
1823702cc6
2 zmenil súbory, kde vykonal 154 pridanie a 1 odobranie
  1. 141 0
      integration/bridge.go
  2. 13 1
      integration/cluster.go

+ 141 - 0
integration/bridge.go

@@ -0,0 +1,141 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"sync"
+)
+
+// bridge creates a unix socket bridge to another unix socket, making it possible
+// to disconnect grpc network connections without closing the logical grpc connection.
+type bridge struct {
+	inaddr  string
+	outaddr string
+	l       net.Listener
+	conns   map[*bridgeConn]struct{}
+
+	stopc chan struct{}
+	wg    sync.WaitGroup
+
+	mu sync.Mutex
+}
+
+func newBridge(addr string) (*bridge, error) {
+	b := &bridge{
+		inaddr:  addr + ".bridge",
+		outaddr: addr,
+		conns:   make(map[*bridgeConn]struct{}),
+		stopc:   make(chan struct{}, 1),
+	}
+	if err := os.RemoveAll(b.inaddr); err != nil {
+		return nil, err
+	}
+	l, err := net.Listen("unix", b.inaddr)
+	if err != nil {
+		return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
+	}
+	b.l = l
+	b.wg.Add(1)
+	go b.serveListen()
+	return b, nil
+}
+
+func (b *bridge) URL() string { return "unix://" + b.inaddr }
+
+func (b *bridge) Close() {
+	b.l.Close()
+	select {
+	case b.stopc <- struct{}{}:
+	default:
+	}
+	b.wg.Wait()
+}
+
+func (b *bridge) Reset() {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	for bc := range b.conns {
+		bc.Close()
+	}
+	b.conns = make(map[*bridgeConn]struct{})
+}
+
+func (b *bridge) serveListen() {
+	defer func() {
+		b.l.Close()
+		os.RemoveAll(b.inaddr)
+		b.mu.Lock()
+		for bc := range b.conns {
+			bc.Close()
+		}
+		b.mu.Unlock()
+		b.wg.Done()
+	}()
+
+	for {
+		inc, ierr := b.l.Accept()
+		if ierr != nil {
+			return
+		}
+		outc, oerr := net.Dial("unix", b.outaddr)
+		if oerr != nil {
+			inc.Close()
+			return
+		}
+
+		bc := &bridgeConn{inc, outc}
+		b.wg.Add(1)
+		b.mu.Lock()
+		b.conns[bc] = struct{}{}
+		go b.serveConn(bc)
+		b.mu.Unlock()
+	}
+}
+
+func (b *bridge) serveConn(bc *bridgeConn) {
+	defer func() {
+		bc.Close()
+		b.mu.Lock()
+		delete(b.conns, bc)
+		b.mu.Unlock()
+		b.wg.Done()
+	}()
+
+	var wg sync.WaitGroup
+	wg.Add(2)
+	go func() {
+		io.Copy(bc.out, bc.in)
+		wg.Done()
+	}()
+	go func() {
+		io.Copy(bc.in, bc.out)
+		wg.Done()
+	}()
+	wg.Wait()
+}
+
+type bridgeConn struct {
+	in  net.Conn
+	out net.Conn
+}
+
+func (bc *bridgeConn) Close() {
+	bc.in.Close()
+	bc.out.Close()
+}

+ 13 - 1
integration/cluster.go

@@ -431,6 +431,7 @@ type member struct {
 
 	grpcServer *grpc.Server
 	grpcAddr   string
+	grpcBridge *bridge
 }
 
 func (m *member) GRPCAddr() string { return m.grpcAddr }
@@ -506,11 +507,18 @@ func (m *member) listenGRPC() error {
 	if err != nil {
 		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
 	}
-	m.grpcAddr = "unix://" + m.grpcAddr
+	m.grpcBridge, err = newBridge(m.grpcAddr)
+	if err != nil {
+		l.Close()
+		return err
+	}
+	m.grpcAddr = m.grpcBridge.URL()
 	m.grpcListener = l
 	return nil
 }
 
+func (m *member) DropConnections() { m.grpcBridge.Reset() }
+
 // NewClientV3 creates a new grpc client connection to the member
 func NewClientV3(m *member) (*clientv3.Client, error) {
 	if m.grpcAddr == "" {
@@ -659,6 +667,10 @@ func (m *member) Resume() {
 
 // Close stops the member's etcdserver and closes its connections
 func (m *member) Close() {
+	if m.grpcBridge != nil {
+		m.grpcBridge.Close()
+		m.grpcBridge = nil
+	}
 	if m.grpcServer != nil {
 		m.grpcServer.Stop()
 		m.grpcServer = nil