Browse Source

integration: add Blackhole to bridgeConn

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
741d7e9dca
2 changed files with 59 additions and 10 deletions
  1. 57 10
      integration/bridge.go
  2. 2 0
      integration/cluster.go

+ 57 - 10
integration/bridge.go

@@ -17,6 +17,7 @@ package integration
 import (
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"sync"
 
@@ -31,9 +32,10 @@ type bridge struct {
 	l       net.Listener
 	conns   map[*bridgeConn]struct{}
 
-	stopc  chan struct{}
-	pausec chan struct{}
-	wg     sync.WaitGroup
+	stopc      chan struct{}
+	pausec     chan struct{}
+	blackholec chan struct{}
+	wg         sync.WaitGroup
 
 	mu sync.Mutex
 }
@@ -41,11 +43,12 @@ type bridge struct {
 func newBridge(addr string) (*bridge, error) {
 	b := &bridge{
 		// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
-		inaddr:  addr + "0",
-		outaddr: addr,
-		conns:   make(map[*bridgeConn]struct{}),
-		stopc:   make(chan struct{}),
-		pausec:  make(chan struct{}),
+		inaddr:     addr + "0",
+		outaddr:    addr,
+		conns:      make(map[*bridgeConn]struct{}),
+		stopc:      make(chan struct{}),
+		pausec:     make(chan struct{}),
+		blackholec: make(chan struct{}),
 	}
 	close(b.pausec)
 
@@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
-		io.Copy(bc.out, bc.in)
+		b.ioCopy(bc, bc.out, bc.in)
 		bc.close()
 		wg.Done()
 	}()
 	go func() {
-		io.Copy(bc.in, bc.out)
+		b.ioCopy(bc, bc.in, bc.out)
 		bc.close()
 		wg.Done()
 	}()
@@ -179,3 +182,47 @@ func (bc *bridgeConn) close() {
 	bc.in.Close()
 	bc.out.Close()
 }
+
+func (b *bridge) Blackhole() {
+	b.mu.Lock()
+	close(b.blackholec)
+	b.mu.Unlock()
+}
+
+func (b *bridge) Unblackhole() {
+	b.mu.Lock()
+	for bc := range b.conns {
+		bc.Close()
+	}
+	b.conns = make(map[*bridgeConn]struct{})
+	b.blackholec = make(chan struct{})
+	b.mu.Unlock()
+}
+
+// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer
+func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) {
+	buf := make([]byte, 32*1024)
+	for {
+		select {
+		case <-b.blackholec:
+			io.Copy(ioutil.Discard, src)
+			return nil
+		default:
+		}
+		nr, er := src.Read(buf)
+		if nr > 0 {
+			nw, ew := dst.Write(buf[0:nr])
+			if ew != nil {
+				return ew
+			}
+			if nr != nw {
+				return io.ErrShortWrite
+			}
+		}
+		if er != nil {
+			err = er
+			break
+		}
+	}
+	return
+}

+ 2 - 0
integration/cluster.go

@@ -588,6 +588,8 @@ func (m *member) ID() types.ID { return m.s.ID() }
 func (m *member) DropConnections()    { m.grpcBridge.Reset() }
 func (m *member) PauseConnections()   { m.grpcBridge.Pause() }
 func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
+func (m *member) Blackhole()          { m.grpcBridge.Blackhole() }
+func (m *member) Unblackhole()        { m.grpcBridge.Unblackhole() }
 
 // NewClientV3 creates a new grpc client connection to the member
 func NewClientV3(m *member) (*clientv3.Client, error) {