watcher.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "path"
  16. "strconv"
  17. "strings"
  18. )
  19. //------------------------------------------------------------------------------
  20. //
  21. // Typedefs
  22. //
  23. //------------------------------------------------------------------------------
  24. // WatcherHub is where the client register its watcher
  25. type WatcherHub struct {
  26. watchers map[string][]*Watcher
  27. }
  28. // Currently watcher only contains a response channel
  29. type Watcher struct {
  30. C chan *Response
  31. }
  32. // Create a new watcherHub
  33. func newWatcherHub() *WatcherHub {
  34. w := new(WatcherHub)
  35. w.watchers = make(map[string][]*Watcher)
  36. return w
  37. }
  38. // Create a new watcher
  39. func NewWatcher() *Watcher {
  40. return &Watcher{C: make(chan *Response, 1)}
  41. }
  42. // Add a watcher to the watcherHub
  43. func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
  44. responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
  45. prefix = path.Clean("/" + prefix)
  46. if sinceIndex != 0 && sinceIndex >= responseStartIndex {
  47. for i := sinceIndex; i <= currentIndex; i++ {
  48. if checkResponse(prefix, i, resMap) {
  49. watcher.C <- resMap[strconv.FormatUint(i, 10)]
  50. return nil
  51. }
  52. }
  53. }
  54. _, ok := w.watchers[prefix]
  55. if !ok {
  56. w.watchers[prefix] = make([]*Watcher, 0)
  57. }
  58. w.watchers[prefix] = append(w.watchers[prefix], watcher)
  59. return nil
  60. }
  61. // Check if the response has what we are watching
  62. func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
  63. resp, ok := resMap[strconv.FormatUint(index, 10)]
  64. if !ok {
  65. // not storage system command
  66. return false
  67. } else {
  68. path := resp.Key
  69. if strings.HasPrefix(path, prefix) {
  70. prefixLen := len(prefix)
  71. if len(path) == prefixLen || path[prefixLen] == '/' {
  72. return true
  73. }
  74. }
  75. }
  76. return false
  77. }
  78. // Notify the watcher a action happened
  79. func (w *WatcherHub) notify(resp Response) error {
  80. resp.Key = path.Clean(resp.Key)
  81. segments := strings.Split(resp.Key, "/")
  82. currPath := "/"
  83. // walk through all the pathes
  84. for _, segment := range segments {
  85. currPath = path.Join(currPath, segment)
  86. watchers, ok := w.watchers[currPath]
  87. if ok {
  88. newWatchers := make([]*Watcher, 0)
  89. // notify all the watchers
  90. for _, watcher := range watchers {
  91. watcher.C <- &resp
  92. }
  93. if len(newWatchers) == 0 {
  94. // we have notified all the watchers at this path
  95. // delete the map
  96. delete(w.watchers, currPath)
  97. } else {
  98. w.watchers[currPath] = newWatchers
  99. }
  100. }
  101. }
  102. return nil
  103. }
  104. // stopWatchers stops all the watchers
  105. // This function is used when the etcd recovery from a snapshot at runtime
  106. func (w *WatcherHub) stopWatchers() {
  107. for _, subWatchers := range w.watchers {
  108. for _, watcher := range subWatchers {
  109. watcher.C <- nil
  110. }
  111. }
  112. w.watchers = nil
  113. }