watch.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "context"
  17. "sync"
  18. "github.com/coreos/etcd/clientv3"
  19. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/metadata"
  24. )
  25. type watchProxy struct {
  26. cw clientv3.Watcher
  27. ctx context.Context
  28. leader *leader
  29. ranges *watchRanges
  30. // mu protects adding outstanding watch servers through wg.
  31. mu sync.Mutex
  32. // wg waits until all outstanding watch servers quit.
  33. wg sync.WaitGroup
  34. // kv is used for permission checking
  35. kv clientv3.KV
  36. }
  37. func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
  38. cctx, cancel := context.WithCancel(c.Ctx())
  39. wp := &watchProxy{
  40. cw: c.Watcher,
  41. ctx: cctx,
  42. leader: newLeader(c.Ctx(), c.Watcher),
  43. kv: c.KV, // for permission checking
  44. }
  45. wp.ranges = newWatchRanges(wp)
  46. ch := make(chan struct{})
  47. go func() {
  48. defer close(ch)
  49. <-wp.leader.stopNotify()
  50. wp.mu.Lock()
  51. select {
  52. case <-wp.ctx.Done():
  53. case <-wp.leader.disconnectNotify():
  54. cancel()
  55. }
  56. <-wp.ctx.Done()
  57. wp.mu.Unlock()
  58. wp.wg.Wait()
  59. wp.ranges.stop()
  60. }()
  61. return wp, ch
  62. }
  63. func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
  64. wp.mu.Lock()
  65. select {
  66. case <-wp.ctx.Done():
  67. wp.mu.Unlock()
  68. select {
  69. case <-wp.leader.disconnectNotify():
  70. return grpc.ErrClientConnClosing
  71. default:
  72. return wp.ctx.Err()
  73. }
  74. default:
  75. wp.wg.Add(1)
  76. }
  77. wp.mu.Unlock()
  78. ctx, cancel := context.WithCancel(stream.Context())
  79. wps := &watchProxyStream{
  80. ranges: wp.ranges,
  81. watchers: make(map[int64]*watcher),
  82. stream: stream,
  83. watchCh: make(chan *pb.WatchResponse, 1024),
  84. ctx: ctx,
  85. cancel: cancel,
  86. kv: wp.kv,
  87. }
  88. var lostLeaderC <-chan struct{}
  89. if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
  90. v := md[rpctypes.MetadataRequireLeaderKey]
  91. if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
  92. lostLeaderC = wp.leader.lostNotify()
  93. // if leader is known to be lost at creation time, avoid
  94. // letting events through at all
  95. select {
  96. case <-lostLeaderC:
  97. wp.wg.Done()
  98. return rpctypes.ErrNoLeader
  99. default:
  100. }
  101. }
  102. }
  103. // post to stopc => terminate server stream; can't use a waitgroup
  104. // since all goroutines will only terminate after Watch() exits.
  105. stopc := make(chan struct{}, 3)
  106. go func() {
  107. defer func() { stopc <- struct{}{} }()
  108. wps.recvLoop()
  109. }()
  110. go func() {
  111. defer func() { stopc <- struct{}{} }()
  112. wps.sendLoop()
  113. }()
  114. // tear down watch if leader goes down or entire watch proxy is terminated
  115. go func() {
  116. defer func() { stopc <- struct{}{} }()
  117. select {
  118. case <-lostLeaderC:
  119. case <-ctx.Done():
  120. case <-wp.ctx.Done():
  121. }
  122. }()
  123. <-stopc
  124. cancel()
  125. // recv/send may only shutdown after function exits;
  126. // goroutine notifies proxy that stream is through
  127. go func() {
  128. <-stopc
  129. <-stopc
  130. wps.close()
  131. wp.wg.Done()
  132. }()
  133. select {
  134. case <-lostLeaderC:
  135. return rpctypes.ErrNoLeader
  136. case <-wp.leader.disconnectNotify():
  137. return grpc.ErrClientConnClosing
  138. default:
  139. return wps.ctx.Err()
  140. }
  141. }
  142. // watchProxyStream forwards etcd watch events to a proxied client stream.
  143. type watchProxyStream struct {
  144. ranges *watchRanges
  145. // mu protects watchers and nextWatcherID
  146. mu sync.Mutex
  147. // watchers receive events from watch broadcast.
  148. watchers map[int64]*watcher
  149. // nextWatcherID is the id to assign the next watcher on this stream.
  150. nextWatcherID int64
  151. stream pb.Watch_WatchServer
  152. // watchCh receives watch responses from the watchers.
  153. watchCh chan *pb.WatchResponse
  154. ctx context.Context
  155. cancel context.CancelFunc
  156. // kv is used for permission checking
  157. kv clientv3.KV
  158. }
  159. func (wps *watchProxyStream) close() {
  160. var wg sync.WaitGroup
  161. wps.cancel()
  162. wps.mu.Lock()
  163. wg.Add(len(wps.watchers))
  164. for _, wpsw := range wps.watchers {
  165. go func(w *watcher) {
  166. wps.ranges.delete(w)
  167. wg.Done()
  168. }(wpsw)
  169. }
  170. wps.watchers = nil
  171. wps.mu.Unlock()
  172. wg.Wait()
  173. close(wps.watchCh)
  174. }
  175. func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
  176. if len(key) == 0 {
  177. // If the length of the key is 0, we need to obtain full range.
  178. // look at clientv3.WithPrefix()
  179. key = []byte{0}
  180. rangeEnd = []byte{0}
  181. }
  182. req := &pb.RangeRequest{
  183. Serializable: true,
  184. Key: key,
  185. RangeEnd: rangeEnd,
  186. CountOnly: true,
  187. Limit: 1,
  188. }
  189. _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
  190. return err
  191. }
  192. func (wps *watchProxyStream) recvLoop() error {
  193. for {
  194. req, err := wps.stream.Recv()
  195. if err != nil {
  196. return err
  197. }
  198. switch uv := req.RequestUnion.(type) {
  199. case *pb.WatchRequest_CreateRequest:
  200. cr := uv.CreateRequest
  201. if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
  202. // Return WatchResponse which is caused by permission checking if and only if
  203. // the error is permission denied. For other errors (e.g. timeout or connection closed),
  204. // the permission checking mechanism should do nothing for preserving error code.
  205. wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
  206. continue
  207. }
  208. w := &watcher{
  209. wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
  210. id: wps.nextWatcherID,
  211. wps: wps,
  212. nextrev: cr.StartRevision,
  213. progress: cr.ProgressNotify,
  214. prevKV: cr.PrevKv,
  215. filters: v3rpc.FiltersFromRequest(cr),
  216. }
  217. if !w.wr.valid() {
  218. w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
  219. continue
  220. }
  221. wps.nextWatcherID++
  222. w.nextrev = cr.StartRevision
  223. wps.watchers[w.id] = w
  224. wps.ranges.add(w)
  225. case *pb.WatchRequest_CancelRequest:
  226. wps.delete(uv.CancelRequest.WatchId)
  227. default:
  228. panic("not implemented")
  229. }
  230. }
  231. }
  232. func (wps *watchProxyStream) sendLoop() {
  233. for {
  234. select {
  235. case wresp, ok := <-wps.watchCh:
  236. if !ok {
  237. return
  238. }
  239. if err := wps.stream.Send(wresp); err != nil {
  240. return
  241. }
  242. case <-wps.ctx.Done():
  243. return
  244. }
  245. }
  246. }
  247. func (wps *watchProxyStream) delete(id int64) {
  248. wps.mu.Lock()
  249. defer wps.mu.Unlock()
  250. w, ok := wps.watchers[id]
  251. if !ok {
  252. return
  253. }
  254. wps.ranges.delete(w)
  255. delete(wps.watchers, id)
  256. resp := &pb.WatchResponse{
  257. Header: &w.lastHeader,
  258. WatchId: id,
  259. Canceled: true,
  260. }
  261. wps.watchCh <- resp
  262. }