Browse Source

init cancel watcher

Xiang Li 12 years ago
parent
commit
5e499456f0
7 changed files with 65 additions and 50 deletions
  1. 2 2
      server/v1/watch_key_handler.go
  2. 3 2
      server/v2/get_handler.go
  3. 7 6
      store/store.go
  4. 20 18
      store/store_test.go
  5. 5 4
      store/watcher.go
  6. 22 15
      store/watcher_hub.go
  7. 6 3
      store/watcher_test.go

+ 2 - 2
server/v1/watch_key_handler.go

@@ -25,11 +25,11 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	}
 	}
 
 
 	// Start the watcher on the store.
 	// Start the watcher on the store.
-	c, err := s.Store().Watch(key, false, sinceIndex)
+	watcher, err := s.Store().NewWatcher(key, false, sinceIndex)
 	if err != nil {
 	if err != nil {
 		return etcdErr.NewError(500, key, s.Store().Index())
 		return etcdErr.NewError(500, key, s.Store().Index())
 	}
 	}
-	event := <-c
+	event := <-watcher.EventChan
 
 
 	// Convert event to a response and write to client.
 	// Convert event to a response and write to client.
 	b, _ := json.Marshal(event.Response(s.Store().Index()))
 	b, _ := json.Marshal(event.Response(s.Store().Index()))

+ 3 - 2
server/v2/get_handler.go

@@ -55,7 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		}
 		}
 
 
 		// Start the watcher on the store.
 		// Start the watcher on the store.
-		eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
+		watcher, err := s.Store().NewWatcher(key, recursive, sinceIndex)
 		if err != nil {
 		if err != nil {
 			return etcdErr.NewError(500, key, s.Store().Index())
 			return etcdErr.NewError(500, key, s.Store().Index())
 		}
 		}
@@ -65,8 +65,9 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 
 		select {
 		select {
 		case <-closeChan:
 		case <-closeChan:
+			watcher.Remove()
 			return nil
 			return nil
-		case event = <-eventChan:
+		case event = <-watcher.EventChan:
 		}
 		}
 
 
 	} else { //get
 	} else { //get

+ 7 - 6
store/store.go

@@ -52,7 +52,8 @@ type Store interface {
 		value string, expireTime time.Time) (*Event, error)
 		value string, expireTime time.Time) (*Event, error)
 	Delete(nodePath string, recursive, dir bool) (*Event, error)
 	Delete(nodePath string, recursive, dir bool) (*Event, error)
 	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
 	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
-	Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
+
+	NewWatcher(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error)
 
 
 	Save() ([]byte, error)
 	Save() ([]byte, error)
 	Recovery(state []byte) error
 	Recovery(state []byte) error
@@ -340,21 +341,21 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
 	return e, nil
 	return e, nil
 }
 }
 
 
-func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
+func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watcher, error) {
 	key = path.Clean(path.Join("/", key))
 	key = path.Clean(path.Join("/", key))
 	nextIndex := s.CurrentIndex + 1
 	nextIndex := s.CurrentIndex + 1
 
 
 	s.worldLock.RLock()
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 	defer s.worldLock.RUnlock()
 
 
-	var c <-chan *Event
+	var w *Watcher
 	var err *etcdErr.Error
 	var err *etcdErr.Error
 
 
 	if sinceIndex == 0 {
 	if sinceIndex == 0 {
-		c, err = s.WatcherHub.watch(key, recursive, nextIndex)
+		w, err = s.WatcherHub.watch(key, recursive, nextIndex)
 
 
 	} else {
 	} else {
-		c, err = s.WatcherHub.watch(key, recursive, sinceIndex)
+		w, err = s.WatcherHub.watch(key, recursive, sinceIndex)
 	}
 	}
 
 
 	if err != nil {
 	if err != nil {
@@ -364,7 +365,7 @@ func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Ev
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return c, nil
+	return w, nil
 }
 }
 
 
 // walk function walks all the nodePath and apply the walkFunc on each directory
 // walk function walks all the nodePath and apply the walkFunc on each directory

+ 20 - 18
store/store_test.go

@@ -446,7 +446,8 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 // Ensure that the store can watch for key creation.
 // Ensure that the store can watch for key creation.
 func TestStoreWatchCreate(t *testing.T) {
 func TestStoreWatchCreate(t *testing.T) {
 	s := newStore()
 	s := newStore()
-	c, _ := s.Watch("/foo", false, 0)
+	w, _ := s.NewWatcher("/foo", false, 0)
+	c := w.EventChan
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
 	e := nbselect(c)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Action, "create", "")
@@ -458,9 +459,9 @@ func TestStoreWatchCreate(t *testing.T) {
 // Ensure that the store can watch for recursive key creation.
 // Ensure that the store can watch for recursive key creation.
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 	s := newStore()
 	s := newStore()
-	c, _ := s.Watch("/foo", true, 0)
+	w, _ := s.NewWatcher("/foo", true, 0)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 }
 }
@@ -469,9 +470,9 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
 func TestStoreWatchUpdate(t *testing.T) {
 func TestStoreWatchUpdate(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	c, _ := s.Watch("/foo", false, 0)
+	w, _ := s.NewWatcher("/foo", false, 0)
 	s.Update("/foo", "baz", Permanent)
 	s.Update("/foo", "baz", Permanent)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 }
 }
@@ -480,9 +481,9 @@ func TestStoreWatchUpdate(t *testing.T) {
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	c, _ := s.Watch("/foo", true, 0)
+	w, _ := s.NewWatcher("/foo", true, 0)
 	s.Update("/foo/bar", "baz", Permanent)
 	s.Update("/foo/bar", "baz", Permanent)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 }
 }
@@ -491,9 +492,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
 func TestStoreWatchDelete(t *testing.T) {
 func TestStoreWatchDelete(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	c, _ := s.Watch("/foo", false, 0)
+	w, _ := s.NewWatcher("/foo", false, 0)
 	s.Delete("/foo", false, false)
 	s.Delete("/foo", false, false)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 }
 }
@@ -502,9 +503,9 @@ func TestStoreWatchDelete(t *testing.T) {
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	c, _ := s.Watch("/foo", true, 0)
+	w, _ := s.NewWatcher("/foo", true, 0)
 	s.Delete("/foo/bar", false, false)
 	s.Delete("/foo/bar", false, false)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 }
 }
@@ -513,9 +514,9 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	c, _ := s.Watch("/foo", false, 0)
+	w, _ := s.NewWatcher("/foo", false, 0)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 }
 }
@@ -524,9 +525,9 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	c, _ := s.Watch("/foo", true, 0)
+	w, _ := s.NewWatcher("/foo", true, 0)
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
-	e := nbselect(c)
+	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
 }
 }
@@ -544,15 +545,16 @@ func TestStoreWatchExpire(t *testing.T) {
 	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 
 
-	c, _ := s.Watch("/", true, 0)
+	w, _ := s.NewWatcher("/", true, 0)
+	c := w.EventChan
 	e := nbselect(c)
 	e := nbselect(c)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
 	time.Sleep(600 * time.Millisecond)
 	time.Sleep(600 * time.Millisecond)
 	e = nbselect(c)
 	e = nbselect(c)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
-	c, _ = s.Watch("/", true, 4)
-	e = nbselect(c)
+	w, _ = s.NewWatcher("/", true, 4)
+	e = nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foofoo", "")
 	assert.Equal(t, e.Node.Key, "/foofoo", "")
 }
 }

+ 5 - 4
store/watcher.go

@@ -16,15 +16,16 @@ limitations under the License.
 
 
 package store
 package store
 
 
-type watcher struct {
-	eventChan  chan *Event
+type Watcher struct {
+	EventChan  chan *Event
 	recursive  bool
 	recursive  bool
 	sinceIndex uint64
 	sinceIndex uint64
+	Remove     func()
 }
 }
 
 
 // notify function notifies the watcher. If the watcher interests in the given path,
 // notify function notifies the watcher. If the watcher interests in the given path,
 // the function will return true.
 // the function will return true.
-func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
+func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	// watcher is interested the path in three cases and under one condition
 	// watcher is interested the path in three cases and under one condition
 	// the condition is that the event happens after the watcher's sinceIndex
 	// the condition is that the event happens after the watcher's sinceIndex
 
 
@@ -41,7 +42,7 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// should get notified even if "/foo" is not the path it is watching.
 	// should get notified even if "/foo" is not the path it is watching.
 	if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
 	if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
-		w.eventChan <- e
+		w.EventChan <- e
 		return true
 		return true
 	}
 	}
 	return false
 	return false

+ 22 - 15
store/watcher_hub.go

@@ -36,41 +36,48 @@ func newWatchHub(capacity int) *watcherHub {
 // If recursive is true, the first change after index under key will be sent to the event channel.
 // If recursive is true, the first change after index under key will be sent to the event channel.
 // If recursive is false, the first change after index at key will be sent to the event channel.
 // If recursive is false, the first change after index at key will be sent to the event channel.
 // If index is zero, watch will start from the current index + 1.
 // If index is zero, watch will start from the current index + 1.
-func (wh *watcherHub) watch(key string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
+func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) {
 	event, err := wh.EventHistory.scan(key, recursive, index)
 	event, err := wh.EventHistory.scan(key, recursive, index)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	eventChan := make(chan *Event, 1) // use a buffered channel
+	w := &Watcher{
+		EventChan:  make(chan *Event, 1), // use a buffered channel
+		recursive:  recursive,
+		sinceIndex: index,
+	}
 
 
 	if event != nil {
 	if event != nil {
-		eventChan <- event
+		w.EventChan <- event
 
 
-		return eventChan, nil
-	}
-
-	w := &watcher{
-		eventChan:  eventChan,
-		recursive:  recursive,
-		sinceIndex: index,
+		return w, nil
 	}
 	}
 
 
 	l, ok := wh.watchers[key]
 	l, ok := wh.watchers[key]
 
 
+	var elem *list.Element
+
 	if ok { // add the new watcher to the back of the list
 	if ok { // add the new watcher to the back of the list
-		l.PushBack(w)
+		elem = l.PushBack(w)
 
 
 	} else { // create a new list and add the new watcher
 	} else { // create a new list and add the new watcher
-		l := list.New()
-		l.PushBack(w)
+		l = list.New()
+		elem = l.PushBack(w)
 		wh.watchers[key] = l
 		wh.watchers[key] = l
 	}
 	}
 
 
+	w.Remove = func() {
+		l.Remove(elem)
+		if l.Len() == 0 {
+			delete(wh.watchers, key)
+		}
+	}
+
 	atomic.AddInt64(&wh.count, 1)
 	atomic.AddInt64(&wh.count, 1)
 
 
-	return eventChan, nil
+	return w, nil
 }
 }
 
 
 // notify function accepts an event and notify to the watchers.
 // notify function accepts an event and notify to the watchers.
@@ -109,7 +116,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 
 
 			next := curr.Next() // save reference to the next one in the list
 			next := curr.Next() // save reference to the next one in the list
 
 
-			w, _ := curr.Value.(*watcher)
+			w, _ := curr.Value.(*Watcher)
 
 
 			if w.notify(e, e.Node.Key == path, deleted) {
 			if w.notify(e, e.Node.Key == path, deleted) {
 
 

+ 6 - 3
store/watcher_test.go

@@ -23,10 +23,11 @@ import (
 func TestWatcher(t *testing.T) {
 func TestWatcher(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	wh := s.WatcherHub
 	wh := s.WatcherHub
-	c, err := wh.watch("/foo", true, 1)
+	w, err := wh.watch("/foo", true, 1)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("%v", err)
 		t.Fatalf("%v", err)
 	}
 	}
+	c := w.EventChan
 
 
 	select {
 	select {
 	case <-c:
 	case <-c:
@@ -45,7 +46,8 @@ func TestWatcher(t *testing.T) {
 		t.Fatal("recv != send")
 		t.Fatal("recv != send")
 	}
 	}
 
 
-	c, _ = wh.watch("/foo", false, 2)
+	w, _ = wh.watch("/foo", false, 2)
+	c = w.EventChan
 
 
 	e = newEvent(Create, "/foo/bar", 2, 2)
 	e = newEvent(Create, "/foo/bar", 2, 2)
 
 
@@ -69,7 +71,8 @@ func TestWatcher(t *testing.T) {
 	}
 	}
 
 
 	// ensure we are doing exact matching rather than prefix matching
 	// ensure we are doing exact matching rather than prefix matching
-	c, _ = wh.watch("/fo", true, 1)
+	w, _ = wh.watch("/fo", true, 1)
+	c = w.EventChan
 
 
 	select {
 	select {
 	case re = <-c:
 	case re = <-c: