watch.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "sync"
  17. "golang.org/x/net/context"
  18. "golang.org/x/time/rate"
  19. "google.golang.org/grpc/metadata"
  20. "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. )
  25. type watchProxy struct {
  26. cw clientv3.Watcher
  27. ctx context.Context
  28. ranges *watchRanges
  29. // retryLimiter controls the create watch retry rate on lost leaders.
  30. retryLimiter *rate.Limiter
  31. // mu protects leaderc updates.
  32. mu sync.RWMutex
  33. leaderc chan struct{}
  34. // wg waits until all outstanding watch servers quit.
  35. wg sync.WaitGroup
  36. }
  37. const (
  38. lostLeaderKey = "__lostleader" // watched to detect leader loss
  39. retryPerSecond = 10
  40. )
  41. func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
  42. wp := &watchProxy{
  43. cw: c.Watcher,
  44. ctx: c.Ctx(),
  45. retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
  46. leaderc: make(chan struct{}),
  47. }
  48. wp.ranges = newWatchRanges(wp)
  49. ch := make(chan struct{})
  50. go func() {
  51. defer close(ch)
  52. // a new streams without opening any watchers won't catch
  53. // a lost leader event, so have a special watch to monitor it
  54. rev := int64((uint64(1) << 63) - 2)
  55. lctx := clientv3.WithRequireLeader(wp.ctx)
  56. for wp.ctx.Err() == nil {
  57. wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev))
  58. for range wch {
  59. }
  60. wp.mu.Lock()
  61. close(wp.leaderc)
  62. wp.leaderc = make(chan struct{})
  63. wp.mu.Unlock()
  64. wp.retryLimiter.Wait(wp.ctx)
  65. }
  66. wp.mu.Lock()
  67. <-wp.ctx.Done()
  68. wp.mu.Unlock()
  69. wp.wg.Wait()
  70. wp.ranges.stop()
  71. }()
  72. return wp, ch
  73. }
  74. func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
  75. wp.mu.Lock()
  76. select {
  77. case <-wp.ctx.Done():
  78. wp.mu.Unlock()
  79. return
  80. default:
  81. wp.wg.Add(1)
  82. }
  83. wp.mu.Unlock()
  84. ctx, cancel := context.WithCancel(stream.Context())
  85. wps := &watchProxyStream{
  86. ranges: wp.ranges,
  87. watchers: make(map[int64]*watcher),
  88. stream: stream,
  89. watchCh: make(chan *pb.WatchResponse, 1024),
  90. ctx: ctx,
  91. cancel: cancel,
  92. }
  93. var leaderc <-chan struct{}
  94. if md, ok := metadata.FromContext(stream.Context()); ok {
  95. v := md[rpctypes.MetadataRequireLeaderKey]
  96. if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
  97. leaderc = wp.lostLeaderNotify()
  98. }
  99. }
  100. // post to stopc => terminate server stream; can't use a waitgroup
  101. // since all goroutines will only terminate after Watch() exits.
  102. stopc := make(chan struct{}, 3)
  103. go func() {
  104. defer func() { stopc <- struct{}{} }()
  105. wps.recvLoop()
  106. }()
  107. go func() {
  108. defer func() { stopc <- struct{}{} }()
  109. wps.sendLoop()
  110. }()
  111. // tear down watch if leader goes down or entire watch proxy is terminated
  112. go func() {
  113. defer func() { stopc <- struct{}{} }()
  114. select {
  115. case <-leaderc:
  116. case <-ctx.Done():
  117. case <-wp.ctx.Done():
  118. }
  119. }()
  120. <-stopc
  121. cancel()
  122. // recv/send may only shutdown after function exits;
  123. // goroutine notifies proxy that stream is through
  124. go func() {
  125. <-stopc
  126. <-stopc
  127. wps.close()
  128. wp.wg.Done()
  129. }()
  130. select {
  131. case <-leaderc:
  132. return rpctypes.ErrNoLeader
  133. default:
  134. return wps.ctx.Err()
  135. }
  136. }
  137. func (wp *watchProxy) lostLeaderNotify() <-chan struct{} {
  138. wp.mu.RLock()
  139. defer wp.mu.RUnlock()
  140. return wp.leaderc
  141. }
  142. // watchProxyStream forwards etcd watch events to a proxied client stream.
  143. type watchProxyStream struct {
  144. ranges *watchRanges
  145. // mu protects watchers and nextWatcherID
  146. mu sync.Mutex
  147. // watchers receive events from watch broadcast.
  148. watchers map[int64]*watcher
  149. // nextWatcherID is the id to assign the next watcher on this stream.
  150. nextWatcherID int64
  151. stream pb.Watch_WatchServer
  152. // watchCh receives watch responses from the watchers.
  153. watchCh chan *pb.WatchResponse
  154. ctx context.Context
  155. cancel context.CancelFunc
  156. }
  157. func (wps *watchProxyStream) close() {
  158. var wg sync.WaitGroup
  159. wps.cancel()
  160. wps.mu.Lock()
  161. wg.Add(len(wps.watchers))
  162. for _, wpsw := range wps.watchers {
  163. go func(w *watcher) {
  164. wps.ranges.delete(w)
  165. wg.Done()
  166. }(wpsw)
  167. }
  168. wps.watchers = nil
  169. wps.mu.Unlock()
  170. wg.Wait()
  171. close(wps.watchCh)
  172. }
  173. func (wps *watchProxyStream) recvLoop() error {
  174. for {
  175. req, err := wps.stream.Recv()
  176. if err != nil {
  177. return err
  178. }
  179. switch uv := req.RequestUnion.(type) {
  180. case *pb.WatchRequest_CreateRequest:
  181. cr := uv.CreateRequest
  182. w := &watcher{
  183. wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
  184. id: wps.nextWatcherID,
  185. wps: wps,
  186. nextrev: cr.StartRevision,
  187. progress: cr.ProgressNotify,
  188. prevKV: cr.PrevKv,
  189. filters: v3rpc.FiltersFromRequest(cr),
  190. }
  191. if !w.wr.valid() {
  192. w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
  193. continue
  194. }
  195. wps.nextWatcherID++
  196. w.nextrev = cr.StartRevision
  197. wps.watchers[w.id] = w
  198. wps.ranges.add(w)
  199. case *pb.WatchRequest_CancelRequest:
  200. wps.delete(uv.CancelRequest.WatchId)
  201. default:
  202. panic("not implemented")
  203. }
  204. }
  205. }
  206. func (wps *watchProxyStream) sendLoop() {
  207. for {
  208. select {
  209. case wresp, ok := <-wps.watchCh:
  210. if !ok {
  211. return
  212. }
  213. if err := wps.stream.Send(wresp); err != nil {
  214. return
  215. }
  216. case <-wps.ctx.Done():
  217. return
  218. }
  219. }
  220. }
  221. func (wps *watchProxyStream) delete(id int64) {
  222. wps.mu.Lock()
  223. defer wps.mu.Unlock()
  224. w, ok := wps.watchers[id]
  225. if !ok {
  226. return
  227. }
  228. wps.ranges.delete(w)
  229. delete(wps.watchers, id)
  230. resp := &pb.WatchResponse{
  231. Header: &w.lastHeader,
  232. WatchId: id,
  233. Canceled: true,
  234. }
  235. wps.watchCh <- resp
  236. }