watch.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "context"
  17. "sync"
  18. "go.etcd.io/etcd/clientv3"
  19. "go.etcd.io/etcd/etcdserver/api/v3rpc"
  20. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  22. "google.golang.org/grpc/codes"
  23. "google.golang.org/grpc/metadata"
  24. "google.golang.org/grpc/status"
  25. )
  26. type watchProxy struct {
  27. cw clientv3.Watcher
  28. ctx context.Context
  29. leader *leader
  30. ranges *watchRanges
  31. // mu protects adding outstanding watch servers through wg.
  32. mu sync.Mutex
  33. // wg waits until all outstanding watch servers quit.
  34. wg sync.WaitGroup
  35. // kv is used for permission checking
  36. kv clientv3.KV
  37. }
  38. func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
  39. cctx, cancel := context.WithCancel(c.Ctx())
  40. wp := &watchProxy{
  41. cw: c.Watcher,
  42. ctx: cctx,
  43. leader: newLeader(c.Ctx(), c.Watcher),
  44. kv: c.KV, // for permission checking
  45. }
  46. wp.ranges = newWatchRanges(wp)
  47. ch := make(chan struct{})
  48. go func() {
  49. defer close(ch)
  50. <-wp.leader.stopNotify()
  51. wp.mu.Lock()
  52. select {
  53. case <-wp.ctx.Done():
  54. case <-wp.leader.disconnectNotify():
  55. cancel()
  56. }
  57. <-wp.ctx.Done()
  58. wp.mu.Unlock()
  59. wp.wg.Wait()
  60. wp.ranges.stop()
  61. }()
  62. return wp, ch
  63. }
  64. func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
  65. wp.mu.Lock()
  66. select {
  67. case <-wp.ctx.Done():
  68. wp.mu.Unlock()
  69. select {
  70. case <-wp.leader.disconnectNotify():
  71. return status.Error(codes.Canceled, "the client connection is closing")
  72. default:
  73. return wp.ctx.Err()
  74. }
  75. default:
  76. wp.wg.Add(1)
  77. }
  78. wp.mu.Unlock()
  79. ctx, cancel := context.WithCancel(stream.Context())
  80. wps := &watchProxyStream{
  81. ranges: wp.ranges,
  82. watchers: make(map[int64]*watcher),
  83. stream: stream,
  84. watchCh: make(chan *pb.WatchResponse, 1024),
  85. ctx: ctx,
  86. cancel: cancel,
  87. kv: wp.kv,
  88. }
  89. var lostLeaderC <-chan struct{}
  90. if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
  91. v := md[rpctypes.MetadataRequireLeaderKey]
  92. if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
  93. lostLeaderC = wp.leader.lostNotify()
  94. // if leader is known to be lost at creation time, avoid
  95. // letting events through at all
  96. select {
  97. case <-lostLeaderC:
  98. wp.wg.Done()
  99. return rpctypes.ErrNoLeader
  100. default:
  101. }
  102. }
  103. }
  104. // post to stopc => terminate server stream; can't use a waitgroup
  105. // since all goroutines will only terminate after Watch() exits.
  106. stopc := make(chan struct{}, 3)
  107. go func() {
  108. defer func() { stopc <- struct{}{} }()
  109. wps.recvLoop()
  110. }()
  111. go func() {
  112. defer func() { stopc <- struct{}{} }()
  113. wps.sendLoop()
  114. }()
  115. // tear down watch if leader goes down or entire watch proxy is terminated
  116. go func() {
  117. defer func() { stopc <- struct{}{} }()
  118. select {
  119. case <-lostLeaderC:
  120. case <-ctx.Done():
  121. case <-wp.ctx.Done():
  122. }
  123. }()
  124. <-stopc
  125. cancel()
  126. // recv/send may only shutdown after function exits;
  127. // goroutine notifies proxy that stream is through
  128. go func() {
  129. <-stopc
  130. <-stopc
  131. wps.close()
  132. wp.wg.Done()
  133. }()
  134. select {
  135. case <-lostLeaderC:
  136. return rpctypes.ErrNoLeader
  137. case <-wp.leader.disconnectNotify():
  138. return status.Error(codes.Canceled, "the client connection is closing")
  139. default:
  140. return wps.ctx.Err()
  141. }
  142. }
  143. // watchProxyStream forwards etcd watch events to a proxied client stream.
  144. type watchProxyStream struct {
  145. ranges *watchRanges
  146. // mu protects watchers and nextWatcherID
  147. mu sync.Mutex
  148. // watchers receive events from watch broadcast.
  149. watchers map[int64]*watcher
  150. // nextWatcherID is the id to assign the next watcher on this stream.
  151. nextWatcherID int64
  152. stream pb.Watch_WatchServer
  153. // watchCh receives watch responses from the watchers.
  154. watchCh chan *pb.WatchResponse
  155. ctx context.Context
  156. cancel context.CancelFunc
  157. // kv is used for permission checking
  158. kv clientv3.KV
  159. }
  160. func (wps *watchProxyStream) close() {
  161. var wg sync.WaitGroup
  162. wps.cancel()
  163. wps.mu.Lock()
  164. wg.Add(len(wps.watchers))
  165. for _, wpsw := range wps.watchers {
  166. go func(w *watcher) {
  167. wps.ranges.delete(w)
  168. wg.Done()
  169. }(wpsw)
  170. }
  171. wps.watchers = nil
  172. wps.mu.Unlock()
  173. wg.Wait()
  174. close(wps.watchCh)
  175. }
  176. func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
  177. if len(key) == 0 {
  178. // If the length of the key is 0, we need to obtain full range.
  179. // look at clientv3.WithPrefix()
  180. key = []byte{0}
  181. rangeEnd = []byte{0}
  182. }
  183. req := &pb.RangeRequest{
  184. Serializable: true,
  185. Key: key,
  186. RangeEnd: rangeEnd,
  187. CountOnly: true,
  188. Limit: 1,
  189. }
  190. _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
  191. return err
  192. }
  193. func (wps *watchProxyStream) recvLoop() error {
  194. for {
  195. req, err := wps.stream.Recv()
  196. if err != nil {
  197. return err
  198. }
  199. switch uv := req.RequestUnion.(type) {
  200. case *pb.WatchRequest_CreateRequest:
  201. cr := uv.CreateRequest
  202. if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
  203. wps.watchCh <- &pb.WatchResponse{
  204. Header: &pb.ResponseHeader{},
  205. WatchId: -1,
  206. Created: true,
  207. Canceled: true,
  208. CancelReason: err.Error(),
  209. }
  210. continue
  211. }
  212. wps.mu.Lock()
  213. w := &watcher{
  214. wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
  215. id: wps.nextWatcherID,
  216. wps: wps,
  217. nextrev: cr.StartRevision,
  218. progress: cr.ProgressNotify,
  219. prevKV: cr.PrevKv,
  220. filters: v3rpc.FiltersFromRequest(cr),
  221. }
  222. if !w.wr.valid() {
  223. w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
  224. continue
  225. }
  226. wps.nextWatcherID++
  227. w.nextrev = cr.StartRevision
  228. wps.watchers[w.id] = w
  229. wps.ranges.add(w)
  230. wps.mu.Unlock()
  231. case *pb.WatchRequest_CancelRequest:
  232. wps.delete(uv.CancelRequest.WatchId)
  233. default:
  234. panic("not implemented")
  235. }
  236. }
  237. }
  238. func (wps *watchProxyStream) sendLoop() {
  239. for {
  240. select {
  241. case wresp, ok := <-wps.watchCh:
  242. if !ok {
  243. return
  244. }
  245. if err := wps.stream.Send(wresp); err != nil {
  246. return
  247. }
  248. case <-wps.ctx.Done():
  249. return
  250. }
  251. }
  252. }
  253. func (wps *watchProxyStream) delete(id int64) {
  254. wps.mu.Lock()
  255. defer wps.mu.Unlock()
  256. w, ok := wps.watchers[id]
  257. if !ok {
  258. return
  259. }
  260. wps.ranges.delete(w)
  261. delete(wps.watchers, id)
  262. resp := &pb.WatchResponse{
  263. Header: &w.lastHeader,
  264. WatchId: id,
  265. Canceled: true,
  266. }
  267. wps.watchCh <- resp
  268. }