bridge.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "sync"
  20. "github.com/coreos/etcd/pkg/transport"
  21. )
  22. // bridge creates a unix socket bridge to another unix socket, making it possible
  23. // to disconnect grpc network connections without closing the logical grpc connection.
  24. type bridge struct {
  25. inaddr string
  26. outaddr string
  27. l net.Listener
  28. conns map[*bridgeConn]struct{}
  29. stopc chan struct{}
  30. pausec chan struct{}
  31. wg sync.WaitGroup
  32. mu sync.Mutex
  33. }
  34. func newBridge(addr string) (*bridge, error) {
  35. b := &bridge{
  36. // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
  37. inaddr: addr + "0",
  38. outaddr: addr,
  39. conns: make(map[*bridgeConn]struct{}),
  40. stopc: make(chan struct{}),
  41. pausec: make(chan struct{}),
  42. }
  43. close(b.pausec)
  44. l, err := transport.NewUnixListener(b.inaddr)
  45. if err != nil {
  46. return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
  47. }
  48. b.l = l
  49. b.wg.Add(1)
  50. go b.serveListen()
  51. return b, nil
  52. }
  53. func (b *bridge) URL() string { return "unix://" + b.inaddr }
  54. func (b *bridge) Close() {
  55. b.l.Close()
  56. b.mu.Lock()
  57. select {
  58. case <-b.stopc:
  59. default:
  60. close(b.stopc)
  61. }
  62. b.mu.Unlock()
  63. b.wg.Wait()
  64. }
  65. func (b *bridge) Reset() {
  66. b.mu.Lock()
  67. defer b.mu.Unlock()
  68. for bc := range b.conns {
  69. bc.Close()
  70. }
  71. b.conns = make(map[*bridgeConn]struct{})
  72. }
  73. func (b *bridge) Pause() {
  74. b.mu.Lock()
  75. b.pausec = make(chan struct{})
  76. b.mu.Unlock()
  77. }
  78. func (b *bridge) Unpause() {
  79. b.mu.Lock()
  80. select {
  81. case <-b.pausec:
  82. default:
  83. close(b.pausec)
  84. }
  85. b.mu.Unlock()
  86. }
  87. func (b *bridge) serveListen() {
  88. defer func() {
  89. b.l.Close()
  90. b.mu.Lock()
  91. for bc := range b.conns {
  92. bc.Close()
  93. }
  94. b.mu.Unlock()
  95. b.wg.Done()
  96. }()
  97. for {
  98. inc, ierr := b.l.Accept()
  99. if ierr != nil {
  100. return
  101. }
  102. b.mu.Lock()
  103. pausec := b.pausec
  104. b.mu.Unlock()
  105. select {
  106. case <-b.stopc:
  107. inc.Close()
  108. return
  109. case <-pausec:
  110. }
  111. outc, oerr := net.Dial("unix", b.outaddr)
  112. if oerr != nil {
  113. inc.Close()
  114. return
  115. }
  116. bc := &bridgeConn{inc, outc, make(chan struct{})}
  117. b.wg.Add(1)
  118. b.mu.Lock()
  119. b.conns[bc] = struct{}{}
  120. go b.serveConn(bc)
  121. b.mu.Unlock()
  122. }
  123. }
  124. func (b *bridge) serveConn(bc *bridgeConn) {
  125. defer func() {
  126. close(bc.donec)
  127. bc.Close()
  128. b.mu.Lock()
  129. delete(b.conns, bc)
  130. b.mu.Unlock()
  131. b.wg.Done()
  132. }()
  133. var wg sync.WaitGroup
  134. wg.Add(2)
  135. go func() {
  136. io.Copy(bc.out, bc.in)
  137. bc.close()
  138. wg.Done()
  139. }()
  140. go func() {
  141. io.Copy(bc.in, bc.out)
  142. bc.close()
  143. wg.Done()
  144. }()
  145. wg.Wait()
  146. }
  147. type bridgeConn struct {
  148. in net.Conn
  149. out net.Conn
  150. donec chan struct{}
  151. }
  152. func (bc *bridgeConn) Close() {
  153. bc.close()
  154. <-bc.donec
  155. }
  156. func (bc *bridgeConn) close() {
  157. bc.in.Close()
  158. bc.out.Close()
  159. }