watcher_group.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "math"
  17. "github.com/coreos/etcd/mvcc/mvccpb"
  18. "github.com/coreos/etcd/pkg/adt"
  19. )
  20. var (
  21. // watchBatchMaxRevs is the maximum distinct revisions that
  22. // may be sent to an unsynced watcher at a time. Declared as
  23. // var instead of const for testing purposes.
  24. watchBatchMaxRevs = 1000
  25. )
  26. type eventBatch struct {
  27. // evs is a batch of revision-ordered events
  28. evs []mvccpb.Event
  29. // revs is the minimum unique revisions observed for this batch
  30. revs int
  31. // moreRev is first revision with more events following this batch
  32. moreRev int64
  33. }
  34. func (eb *eventBatch) add(ev mvccpb.Event) {
  35. if eb.revs > watchBatchMaxRevs {
  36. // maxed out batch size
  37. return
  38. }
  39. if len(eb.evs) == 0 {
  40. // base case
  41. eb.revs = 1
  42. eb.evs = append(eb.evs, ev)
  43. return
  44. }
  45. // revision accounting
  46. ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
  47. evRev := ev.Kv.ModRevision
  48. if evRev > ebRev {
  49. eb.revs++
  50. if eb.revs > watchBatchMaxRevs {
  51. eb.moreRev = evRev
  52. return
  53. }
  54. }
  55. eb.evs = append(eb.evs, ev)
  56. }
  57. type watcherBatch map[*watcher]*eventBatch
  58. func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
  59. eb := wb[w]
  60. if eb == nil {
  61. eb = &eventBatch{}
  62. wb[w] = eb
  63. }
  64. eb.add(ev)
  65. }
  66. // newWatcherBatch maps watchers to their matched events. It enables quick
  67. // events look up by watcher.
  68. func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
  69. if len(wg.watchers) == 0 {
  70. return nil
  71. }
  72. wb := make(watcherBatch)
  73. for _, ev := range evs {
  74. for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
  75. if ev.Kv.ModRevision >= w.minRev {
  76. // don't double notify
  77. wb.add(w, ev)
  78. }
  79. }
  80. }
  81. return wb
  82. }
  83. type watcherSet map[*watcher]struct{}
  84. func (w watcherSet) add(wa *watcher) {
  85. if _, ok := w[wa]; ok {
  86. panic("add watcher twice!")
  87. }
  88. w[wa] = struct{}{}
  89. }
  90. func (w watcherSet) union(ws watcherSet) {
  91. for wa := range ws {
  92. w.add(wa)
  93. }
  94. }
  95. func (w watcherSet) delete(wa *watcher) {
  96. if _, ok := w[wa]; !ok {
  97. panic("removing missing watcher!")
  98. }
  99. delete(w, wa)
  100. }
  101. type watcherSetByKey map[string]watcherSet
  102. func (w watcherSetByKey) add(wa *watcher) {
  103. set := w[string(wa.key)]
  104. if set == nil {
  105. set = make(watcherSet)
  106. w[string(wa.key)] = set
  107. }
  108. set.add(wa)
  109. }
  110. func (w watcherSetByKey) delete(wa *watcher) bool {
  111. k := string(wa.key)
  112. if v, ok := w[k]; ok {
  113. if _, ok := v[wa]; ok {
  114. delete(v, wa)
  115. if len(v) == 0 {
  116. // remove the set; nothing left
  117. delete(w, k)
  118. }
  119. return true
  120. }
  121. }
  122. return false
  123. }
  124. // watcherGroup is a collection of watchers organized by their ranges
  125. type watcherGroup struct {
  126. // keyWatchers has the watchers that watch on a single key
  127. keyWatchers watcherSetByKey
  128. // ranges has the watchers that watch a range; it is sorted by interval
  129. ranges adt.IntervalTree
  130. // watchers is the set of all watchers
  131. watchers watcherSet
  132. }
  133. func newWatcherGroup() watcherGroup {
  134. return watcherGroup{
  135. keyWatchers: make(watcherSetByKey),
  136. watchers: make(watcherSet),
  137. }
  138. }
  139. // add puts a watcher in the group.
  140. func (wg *watcherGroup) add(wa *watcher) {
  141. wg.watchers.add(wa)
  142. if wa.end == nil {
  143. wg.keyWatchers.add(wa)
  144. return
  145. }
  146. // interval already registered?
  147. ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
  148. if iv := wg.ranges.Find(ivl); iv != nil {
  149. iv.Val.(watcherSet).add(wa)
  150. return
  151. }
  152. // not registered, put in interval tree
  153. ws := make(watcherSet)
  154. ws.add(wa)
  155. wg.ranges.Insert(ivl, ws)
  156. }
  157. // contains is whether the given key has a watcher in the group.
  158. func (wg *watcherGroup) contains(key string) bool {
  159. _, ok := wg.keyWatchers[key]
  160. return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key))
  161. }
  162. // size gives the number of unique watchers in the group.
  163. func (wg *watcherGroup) size() int { return len(wg.watchers) }
  164. // delete removes a watcher from the group.
  165. func (wg *watcherGroup) delete(wa *watcher) bool {
  166. if _, ok := wg.watchers[wa]; !ok {
  167. return false
  168. }
  169. wg.watchers.delete(wa)
  170. if wa.end == nil {
  171. wg.keyWatchers.delete(wa)
  172. return true
  173. }
  174. ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
  175. iv := wg.ranges.Find(ivl)
  176. if iv == nil {
  177. return false
  178. }
  179. ws := iv.Val.(watcherSet)
  180. delete(ws, wa)
  181. if len(ws) == 0 {
  182. // remove interval missing watchers
  183. if ok := wg.ranges.Delete(ivl); !ok {
  184. panic("could not remove watcher from interval tree")
  185. }
  186. }
  187. return true
  188. }
  189. // choose selects watchers from the watcher group to update
  190. func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
  191. if len(wg.watchers) < maxWatchers {
  192. return wg, wg.chooseAll(curRev, compactRev)
  193. }
  194. ret := newWatcherGroup()
  195. for w := range wg.watchers {
  196. if maxWatchers <= 0 {
  197. break
  198. }
  199. maxWatchers--
  200. ret.add(w)
  201. }
  202. return &ret, ret.chooseAll(curRev, compactRev)
  203. }
  204. func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
  205. minRev := int64(math.MaxInt64)
  206. for w := range wg.watchers {
  207. if w.minRev > curRev {
  208. panic("watcher current revision should not exceed current revision")
  209. }
  210. if w.minRev < compactRev {
  211. select {
  212. case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
  213. w.compacted = true
  214. wg.delete(w)
  215. default:
  216. // retry next time
  217. }
  218. continue
  219. }
  220. if minRev > w.minRev {
  221. minRev = w.minRev
  222. }
  223. }
  224. return minRev
  225. }
  226. // watcherSetByKey gets the set of watchers that receive events on the given key.
  227. func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
  228. wkeys := wg.keyWatchers[key]
  229. wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
  230. // zero-copy cases
  231. switch {
  232. case len(wranges) == 0:
  233. // no need to merge ranges or copy; reuse single-key set
  234. return wkeys
  235. case len(wranges) == 0 && len(wkeys) == 0:
  236. return nil
  237. case len(wranges) == 1 && len(wkeys) == 0:
  238. return wranges[0].Val.(watcherSet)
  239. }
  240. // copy case
  241. ret := make(watcherSet)
  242. ret.union(wg.keyWatchers[key])
  243. for _, item := range wranges {
  244. ret.union(item.Val.(watcherSet))
  245. }
  246. return ret
  247. }