watch.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "sync"
  17. "golang.org/x/net/context"
  18. "google.golang.org/grpc"
  19. "google.golang.org/grpc/metadata"
  20. "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. )
  25. type watchProxy struct {
  26. cw clientv3.Watcher
  27. ctx context.Context
  28. leader *leader
  29. ranges *watchRanges
  30. // mu protects adding outstanding watch servers through wg.
  31. mu sync.Mutex
  32. // wg waits until all outstanding watch servers quit.
  33. wg sync.WaitGroup
  34. }
  35. func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
  36. cctx, cancel := context.WithCancel(c.Ctx())
  37. wp := &watchProxy{
  38. cw: c.Watcher,
  39. ctx: cctx,
  40. leader: newLeader(c.Ctx(), c.Watcher),
  41. }
  42. wp.ranges = newWatchRanges(wp)
  43. ch := make(chan struct{})
  44. go func() {
  45. defer close(ch)
  46. <-wp.leader.stopNotify()
  47. wp.mu.Lock()
  48. select {
  49. case <-wp.ctx.Done():
  50. case <-wp.leader.disconnectNotify():
  51. cancel()
  52. }
  53. <-wp.ctx.Done()
  54. wp.mu.Unlock()
  55. wp.wg.Wait()
  56. wp.ranges.stop()
  57. }()
  58. return wp, ch
  59. }
  60. func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
  61. wp.mu.Lock()
  62. select {
  63. case <-wp.ctx.Done():
  64. wp.mu.Unlock()
  65. return
  66. default:
  67. wp.wg.Add(1)
  68. }
  69. wp.mu.Unlock()
  70. ctx, cancel := context.WithCancel(stream.Context())
  71. wps := &watchProxyStream{
  72. ranges: wp.ranges,
  73. watchers: make(map[int64]*watcher),
  74. stream: stream,
  75. watchCh: make(chan *pb.WatchResponse, 1024),
  76. ctx: ctx,
  77. cancel: cancel,
  78. }
  79. var lostLeaderC <-chan struct{}
  80. if md, ok := metadata.FromContext(stream.Context()); ok {
  81. v := md[rpctypes.MetadataRequireLeaderKey]
  82. if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
  83. lostLeaderC = wp.leader.lostNotify()
  84. // if leader is known to be lost at creation time, avoid
  85. // letting events through at all
  86. select {
  87. case <-lostLeaderC:
  88. wp.wg.Done()
  89. return rpctypes.ErrNoLeader
  90. default:
  91. }
  92. }
  93. }
  94. // post to stopc => terminate server stream; can't use a waitgroup
  95. // since all goroutines will only terminate after Watch() exits.
  96. stopc := make(chan struct{}, 3)
  97. go func() {
  98. defer func() { stopc <- struct{}{} }()
  99. wps.recvLoop()
  100. }()
  101. go func() {
  102. defer func() { stopc <- struct{}{} }()
  103. wps.sendLoop()
  104. }()
  105. // tear down watch if leader goes down or entire watch proxy is terminated
  106. go func() {
  107. defer func() { stopc <- struct{}{} }()
  108. select {
  109. case <-lostLeaderC:
  110. case <-ctx.Done():
  111. case <-wp.ctx.Done():
  112. }
  113. }()
  114. <-stopc
  115. cancel()
  116. // recv/send may only shutdown after function exits;
  117. // goroutine notifies proxy that stream is through
  118. go func() {
  119. <-stopc
  120. <-stopc
  121. wps.close()
  122. wp.wg.Done()
  123. }()
  124. select {
  125. case <-lostLeaderC:
  126. return rpctypes.ErrNoLeader
  127. case <-wp.leader.disconnectNotify():
  128. return grpc.ErrClientConnClosing
  129. default:
  130. return wps.ctx.Err()
  131. }
  132. }
  133. // watchProxyStream forwards etcd watch events to a proxied client stream.
  134. type watchProxyStream struct {
  135. ranges *watchRanges
  136. // mu protects watchers and nextWatcherID
  137. mu sync.Mutex
  138. // watchers receive events from watch broadcast.
  139. watchers map[int64]*watcher
  140. // nextWatcherID is the id to assign the next watcher on this stream.
  141. nextWatcherID int64
  142. stream pb.Watch_WatchServer
  143. // watchCh receives watch responses from the watchers.
  144. watchCh chan *pb.WatchResponse
  145. ctx context.Context
  146. cancel context.CancelFunc
  147. }
  148. func (wps *watchProxyStream) close() {
  149. var wg sync.WaitGroup
  150. wps.cancel()
  151. wps.mu.Lock()
  152. wg.Add(len(wps.watchers))
  153. for _, wpsw := range wps.watchers {
  154. go func(w *watcher) {
  155. wps.ranges.delete(w)
  156. wg.Done()
  157. }(wpsw)
  158. }
  159. wps.watchers = nil
  160. wps.mu.Unlock()
  161. wg.Wait()
  162. close(wps.watchCh)
  163. }
  164. func (wps *watchProxyStream) recvLoop() error {
  165. for {
  166. req, err := wps.stream.Recv()
  167. if err != nil {
  168. return err
  169. }
  170. switch uv := req.RequestUnion.(type) {
  171. case *pb.WatchRequest_CreateRequest:
  172. cr := uv.CreateRequest
  173. w := &watcher{
  174. wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
  175. id: wps.nextWatcherID,
  176. wps: wps,
  177. nextrev: cr.StartRevision,
  178. progress: cr.ProgressNotify,
  179. prevKV: cr.PrevKv,
  180. filters: v3rpc.FiltersFromRequest(cr),
  181. }
  182. if !w.wr.valid() {
  183. w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
  184. continue
  185. }
  186. wps.nextWatcherID++
  187. w.nextrev = cr.StartRevision
  188. wps.watchers[w.id] = w
  189. wps.ranges.add(w)
  190. case *pb.WatchRequest_CancelRequest:
  191. wps.delete(uv.CancelRequest.WatchId)
  192. default:
  193. panic("not implemented")
  194. }
  195. }
  196. }
  197. func (wps *watchProxyStream) sendLoop() {
  198. for {
  199. select {
  200. case wresp, ok := <-wps.watchCh:
  201. if !ok {
  202. return
  203. }
  204. if err := wps.stream.Send(wresp); err != nil {
  205. return
  206. }
  207. case <-wps.ctx.Done():
  208. return
  209. }
  210. }
  211. }
  212. func (wps *watchProxyStream) delete(id int64) {
  213. wps.mu.Lock()
  214. defer wps.mu.Unlock()
  215. w, ok := wps.watchers[id]
  216. if !ok {
  217. return
  218. }
  219. wps.ranges.delete(w)
  220. delete(wps.watchers, id)
  221. resp := &pb.WatchResponse{
  222. Header: &w.lastHeader,
  223. WatchId: id,
  224. Canceled: true,
  225. }
  226. wps.watchCh <- resp
  227. }