watcher.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "bytes"
  17. "errors"
  18. "sync"
  19. "go.etcd.io/etcd/mvcc/mvccpb"
  20. )
  21. // AutoWatchID is the watcher ID passed in WatchStream.Watch when no
  22. // user-provided ID is available. If pass, an ID will automatically be assigned.
  23. const AutoWatchID WatchID = 0
  24. var (
  25. ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
  26. ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
  27. ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
  28. )
  29. type WatchID int64
  30. // FilterFunc returns true if the given event should be filtered out.
  31. type FilterFunc func(e mvccpb.Event) bool
  32. type WatchStream interface {
  33. // Watch creates a watcher. The watcher watches the events happening or
  34. // happened on the given key or range [key, end) from the given startRev.
  35. //
  36. // The whole event history can be watched unless compacted.
  37. // If "startRev" <=0, watch observes events after currentRev.
  38. //
  39. // The returned "id" is the ID of this watcher. It appears as WatchID
  40. // in events that are sent to the created watcher through stream channel.
  41. // The watch ID is used when it's not equal to AutoWatchID. Otherwise,
  42. // an auto-generated watch ID is returned.
  43. Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
  44. // Chan returns a chan. All watch response will be sent to the returned chan.
  45. Chan() <-chan WatchResponse
  46. // RequestProgress requests the progress of the watcher with given ID. The response
  47. // will only be sent if the watcher is currently synced.
  48. // The responses will be sent through the WatchRespone Chan attached
  49. // with this stream to ensure correct ordering.
  50. // The responses contains no events. The revision in the response is the progress
  51. // of the watchers since the watcher is currently synced.
  52. RequestProgress(id WatchID)
  53. // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
  54. // returned.
  55. Cancel(id WatchID) error
  56. // Close closes Chan and release all related resources.
  57. Close()
  58. // Rev returns the current revision of the KV the stream watches on.
  59. Rev() int64
  60. }
  61. type WatchResponse struct {
  62. // WatchID is the WatchID of the watcher this response sent to.
  63. WatchID WatchID
  64. // Events contains all the events that needs to send.
  65. Events []mvccpb.Event
  66. // Revision is the revision of the KV when the watchResponse is created.
  67. // For a normal response, the revision should be the same as the last
  68. // modified revision inside Events. For a delayed response to a unsynced
  69. // watcher, the revision is greater than the last modified revision
  70. // inside Events.
  71. Revision int64
  72. // CompactRevision is set when the watcher is cancelled due to compaction.
  73. CompactRevision int64
  74. }
  75. // watchStream contains a collection of watchers that share
  76. // one streaming chan to send out watched events and other control events.
  77. type watchStream struct {
  78. watchable watchable
  79. ch chan WatchResponse
  80. mu sync.Mutex // guards fields below it
  81. // nextID is the ID pre-allocated for next new watcher in this stream
  82. nextID WatchID
  83. closed bool
  84. cancels map[WatchID]cancelFunc
  85. watchers map[WatchID]*watcher
  86. }
  87. // Watch creates a new watcher in the stream and returns its WatchID.
  88. func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
  89. // prevent wrong range where key >= end lexicographically
  90. // watch request with 'WithFromKey' has empty-byte range end
  91. if len(end) != 0 && bytes.Compare(key, end) != -1 {
  92. return -1, ErrEmptyWatcherRange
  93. }
  94. ws.mu.Lock()
  95. defer ws.mu.Unlock()
  96. if ws.closed {
  97. return -1, ErrEmptyWatcherRange
  98. }
  99. if id == AutoWatchID {
  100. for ws.watchers[ws.nextID] != nil {
  101. ws.nextID++
  102. }
  103. id = ws.nextID
  104. ws.nextID++
  105. } else if _, ok := ws.watchers[id]; ok {
  106. return -1, ErrWatcherDuplicateID
  107. }
  108. w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
  109. ws.cancels[id] = c
  110. ws.watchers[id] = w
  111. return id, nil
  112. }
  113. func (ws *watchStream) Chan() <-chan WatchResponse {
  114. return ws.ch
  115. }
  116. func (ws *watchStream) Cancel(id WatchID) error {
  117. ws.mu.Lock()
  118. cancel, ok := ws.cancels[id]
  119. w := ws.watchers[id]
  120. ok = ok && !ws.closed
  121. ws.mu.Unlock()
  122. if !ok {
  123. return ErrWatcherNotExist
  124. }
  125. cancel()
  126. ws.mu.Lock()
  127. // The watch isn't removed until cancel so that if Close() is called,
  128. // it will wait for the cancel. Otherwise, Close() could close the
  129. // watch channel while the store is still posting events.
  130. if ww := ws.watchers[id]; ww == w {
  131. delete(ws.cancels, id)
  132. delete(ws.watchers, id)
  133. }
  134. ws.mu.Unlock()
  135. return nil
  136. }
  137. func (ws *watchStream) Close() {
  138. ws.mu.Lock()
  139. defer ws.mu.Unlock()
  140. for _, cancel := range ws.cancels {
  141. cancel()
  142. }
  143. ws.closed = true
  144. close(ws.ch)
  145. watchStreamGauge.Dec()
  146. }
  147. func (ws *watchStream) Rev() int64 {
  148. ws.mu.Lock()
  149. defer ws.mu.Unlock()
  150. return ws.watchable.rev()
  151. }
  152. func (ws *watchStream) RequestProgress(id WatchID) {
  153. ws.mu.Lock()
  154. w, ok := ws.watchers[id]
  155. ws.mu.Unlock()
  156. if !ok {
  157. return
  158. }
  159. ws.watchable.progress(w)
  160. }