watcher_group.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "fmt"
  17. "math"
  18. "github.com/coreos/etcd/mvcc/mvccpb"
  19. "github.com/coreos/etcd/pkg/adt"
  20. )
  21. var (
  22. // watchBatchMaxRevs is the maximum distinct revisions that
  23. // may be sent to an unsynced watcher at a time. Declared as
  24. // var instead of const for testing purposes.
  25. watchBatchMaxRevs = 1000
  26. )
  27. type eventBatch struct {
  28. // evs is a batch of revision-ordered events
  29. evs []mvccpb.Event
  30. // revs is the minimum unique revisions observed for this batch
  31. revs int
  32. // moreRev is first revision with more events following this batch
  33. moreRev int64
  34. }
  35. func (eb *eventBatch) add(ev mvccpb.Event) {
  36. if eb.revs > watchBatchMaxRevs {
  37. // maxed out batch size
  38. return
  39. }
  40. if len(eb.evs) == 0 {
  41. // base case
  42. eb.revs = 1
  43. eb.evs = append(eb.evs, ev)
  44. return
  45. }
  46. // revision accounting
  47. ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
  48. evRev := ev.Kv.ModRevision
  49. if evRev > ebRev {
  50. eb.revs++
  51. if eb.revs > watchBatchMaxRevs {
  52. eb.moreRev = evRev
  53. return
  54. }
  55. }
  56. eb.evs = append(eb.evs, ev)
  57. }
  58. type watcherBatch map[*watcher]*eventBatch
  59. func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
  60. eb := wb[w]
  61. if eb == nil {
  62. eb = &eventBatch{}
  63. wb[w] = eb
  64. }
  65. eb.add(ev)
  66. }
  67. // newWatcherBatch maps watchers to their matched events. It enables quick
  68. // events look up by watcher.
  69. func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
  70. if len(wg.watchers) == 0 {
  71. return nil
  72. }
  73. wb := make(watcherBatch)
  74. for _, ev := range evs {
  75. for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
  76. if ev.Kv.ModRevision >= w.minRev {
  77. // don't double notify
  78. wb.add(w, ev)
  79. }
  80. }
  81. }
  82. return wb
  83. }
  84. type watcherSet map[*watcher]struct{}
  85. func (w watcherSet) add(wa *watcher) {
  86. if _, ok := w[wa]; ok {
  87. panic("add watcher twice!")
  88. }
  89. w[wa] = struct{}{}
  90. }
  91. func (w watcherSet) union(ws watcherSet) {
  92. for wa := range ws {
  93. w.add(wa)
  94. }
  95. }
  96. func (w watcherSet) delete(wa *watcher) {
  97. if _, ok := w[wa]; !ok {
  98. panic("removing missing watcher!")
  99. }
  100. delete(w, wa)
  101. }
  102. type watcherSetByKey map[string]watcherSet
  103. func (w watcherSetByKey) add(wa *watcher) {
  104. set := w[string(wa.key)]
  105. if set == nil {
  106. set = make(watcherSet)
  107. w[string(wa.key)] = set
  108. }
  109. set.add(wa)
  110. }
  111. func (w watcherSetByKey) delete(wa *watcher) bool {
  112. k := string(wa.key)
  113. if v, ok := w[k]; ok {
  114. if _, ok := v[wa]; ok {
  115. delete(v, wa)
  116. if len(v) == 0 {
  117. // remove the set; nothing left
  118. delete(w, k)
  119. }
  120. return true
  121. }
  122. }
  123. return false
  124. }
  125. // watcherGroup is a collection of watchers organized by their ranges
  126. type watcherGroup struct {
  127. // keyWatchers has the watchers that watch on a single key
  128. keyWatchers watcherSetByKey
  129. // ranges has the watchers that watch a range; it is sorted by interval
  130. ranges adt.IntervalTree
  131. // watchers is the set of all watchers
  132. watchers watcherSet
  133. }
  134. func newWatcherGroup() watcherGroup {
  135. return watcherGroup{
  136. keyWatchers: make(watcherSetByKey),
  137. watchers: make(watcherSet),
  138. }
  139. }
  140. // add puts a watcher in the group.
  141. func (wg *watcherGroup) add(wa *watcher) {
  142. wg.watchers.add(wa)
  143. if wa.end == nil {
  144. wg.keyWatchers.add(wa)
  145. return
  146. }
  147. // interval already registered?
  148. ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
  149. if iv := wg.ranges.Find(ivl); iv != nil {
  150. iv.Val.(watcherSet).add(wa)
  151. return
  152. }
  153. // not registered, put in interval tree
  154. ws := make(watcherSet)
  155. ws.add(wa)
  156. wg.ranges.Insert(ivl, ws)
  157. }
  158. // contains is whether the given key has a watcher in the group.
  159. func (wg *watcherGroup) contains(key string) bool {
  160. _, ok := wg.keyWatchers[key]
  161. return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key))
  162. }
  163. // size gives the number of unique watchers in the group.
  164. func (wg *watcherGroup) size() int { return len(wg.watchers) }
  165. // delete removes a watcher from the group.
  166. func (wg *watcherGroup) delete(wa *watcher) bool {
  167. if _, ok := wg.watchers[wa]; !ok {
  168. return false
  169. }
  170. wg.watchers.delete(wa)
  171. if wa.end == nil {
  172. wg.keyWatchers.delete(wa)
  173. return true
  174. }
  175. ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
  176. iv := wg.ranges.Find(ivl)
  177. if iv == nil {
  178. return false
  179. }
  180. ws := iv.Val.(watcherSet)
  181. delete(ws, wa)
  182. if len(ws) == 0 {
  183. // remove interval missing watchers
  184. if ok := wg.ranges.Delete(ivl); !ok {
  185. panic("could not remove watcher from interval tree")
  186. }
  187. }
  188. return true
  189. }
  190. // choose selects watchers from the watcher group to update
  191. func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
  192. if len(wg.watchers) < maxWatchers {
  193. return wg, wg.chooseAll(curRev, compactRev)
  194. }
  195. ret := newWatcherGroup()
  196. for w := range wg.watchers {
  197. if maxWatchers <= 0 {
  198. break
  199. }
  200. maxWatchers--
  201. ret.add(w)
  202. }
  203. return &ret, ret.chooseAll(curRev, compactRev)
  204. }
  205. func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
  206. minRev := int64(math.MaxInt64)
  207. for w := range wg.watchers {
  208. if w.minRev > curRev {
  209. // after network partition, possibly choosing future revision watcher from restore operation
  210. // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
  211. // do not panic when such watcher had been moved from "synced" watcher during restore operation
  212. if !w.restore {
  213. panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
  214. }
  215. // mark 'restore' done, since it's chosen
  216. w.restore = false
  217. }
  218. if w.minRev < compactRev {
  219. select {
  220. case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
  221. w.compacted = true
  222. wg.delete(w)
  223. default:
  224. // retry next time
  225. }
  226. continue
  227. }
  228. if minRev > w.minRev {
  229. minRev = w.minRev
  230. }
  231. }
  232. return minRev
  233. }
  234. // watcherSetByKey gets the set of watchers that receive events on the given key.
  235. func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
  236. wkeys := wg.keyWatchers[key]
  237. wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
  238. // zero-copy cases
  239. switch {
  240. case len(wranges) == 0:
  241. // no need to merge ranges or copy; reuse single-key set
  242. return wkeys
  243. case len(wranges) == 0 && len(wkeys) == 0:
  244. return nil
  245. case len(wranges) == 1 && len(wkeys) == 0:
  246. return wranges[0].Val.(watcherSet)
  247. }
  248. // copy case
  249. ret := make(watcherSet)
  250. ret.union(wg.keyWatchers[key])
  251. for _, item := range wranges {
  252. ret.union(item.Val.(watcherSet))
  253. }
  254. return ret
  255. }