Browse Source

refactor(watcher) change newWatcher to Watch

Xiang Li 12 years ago
parent
commit
fa3b4a7941

+ 1 - 1
server/v1/watch_key_handler.go

@@ -25,7 +25,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	}
 	}
 
 
 	// Start the watcher on the store.
 	// Start the watcher on the store.
-	watcher, err := s.Store().NewWatcher(key, false, sinceIndex)
+	watcher, err := s.Store().Watch(key, false, sinceIndex)
 	if err != nil {
 	if err != nil {
 		return etcdErr.NewError(500, key, s.Store().Index())
 		return etcdErr.NewError(500, key, s.Store().Index())
 	}
 	}

+ 1 - 1
server/v2/get_handler.go

@@ -55,7 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		}
 		}
 
 
 		// Start the watcher on the store.
 		// Start the watcher on the store.
-		watcher, err := s.Store().NewWatcher(key, recursive, sinceIndex)
+		watcher, err := s.Store().Watch(key, recursive, sinceIndex)
 		if err != nil {
 		if err != nil {
 			return etcdErr.NewError(500, key, s.Store().Index())
 			return etcdErr.NewError(500, key, s.Store().Index())
 		}
 		}

+ 4 - 4
store/store.go

@@ -53,7 +53,7 @@ type Store interface {
 	Delete(nodePath string, recursive, dir bool) (*Event, error)
 	Delete(nodePath string, recursive, dir bool) (*Event, error)
 	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
 	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
 
 
-	NewWatcher(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error)
+	Watch(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error)
 
 
 	Save() ([]byte, error)
 	Save() ([]byte, error)
 	Recovery(state []byte) error
 	Recovery(state []byte) error
@@ -341,7 +341,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
 	return e, nil
 	return e, nil
 }
 }
 
 
-func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watcher, error) {
+func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, error) {
 	key = path.Clean(path.Join("/", key))
 	key = path.Clean(path.Join("/", key))
 	nextIndex := s.CurrentIndex + 1
 	nextIndex := s.CurrentIndex + 1
 
 
@@ -352,10 +352,10 @@ func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watc
 	var err *etcdErr.Error
 	var err *etcdErr.Error
 
 
 	if sinceIndex == 0 {
 	if sinceIndex == 0 {
-		w, err = s.WatcherHub.newWatcher(key, recursive, nextIndex)
+		w, err = s.WatcherHub.watch(key, recursive, nextIndex)
 
 
 	} else {
 	} else {
-		w, err = s.WatcherHub.newWatcher(key, recursive, sinceIndex)
+		w, err = s.WatcherHub.watch(key, recursive, sinceIndex)
 	}
 	}
 
 
 	if err != nil {
 	if err != nil {

+ 10 - 10
store/store_test.go

@@ -446,7 +446,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 // Ensure that the store can watch for key creation.
 // Ensure that the store can watch for key creation.
 func TestStoreWatchCreate(t *testing.T) {
 func TestStoreWatchCreate(t *testing.T) {
 	s := newStore()
 	s := newStore()
-	w, _ := s.NewWatcher("/foo", false, 0)
+	w, _ := s.Watch("/foo", false, 0)
 	c := w.EventChan
 	c := w.EventChan
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
 	e := nbselect(c)
 	e := nbselect(c)
@@ -459,7 +459,7 @@ func TestStoreWatchCreate(t *testing.T) {
 // Ensure that the store can watch for recursive key creation.
 // Ensure that the store can watch for recursive key creation.
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 	s := newStore()
 	s := newStore()
-	w, _ := s.NewWatcher("/foo", true, 0)
+	w, _ := s.Watch("/foo", true, 0)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Action, "create", "")
@@ -470,7 +470,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
 func TestStoreWatchUpdate(t *testing.T) {
 func TestStoreWatchUpdate(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	w, _ := s.NewWatcher("/foo", false, 0)
+	w, _ := s.Watch("/foo", false, 0)
 	s.Update("/foo", "baz", Permanent)
 	s.Update("/foo", "baz", Permanent)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Action, "update", "")
@@ -481,7 +481,7 @@ func TestStoreWatchUpdate(t *testing.T) {
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	w, _ := s.NewWatcher("/foo", true, 0)
+	w, _ := s.Watch("/foo", true, 0)
 	s.Update("/foo/bar", "baz", Permanent)
 	s.Update("/foo/bar", "baz", Permanent)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Action, "update", "")
@@ -492,7 +492,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
 func TestStoreWatchDelete(t *testing.T) {
 func TestStoreWatchDelete(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	w, _ := s.NewWatcher("/foo", false, 0)
+	w, _ := s.Watch("/foo", false, 0)
 	s.Delete("/foo", false, false)
 	s.Delete("/foo", false, false)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Action, "delete", "")
@@ -503,7 +503,7 @@ func TestStoreWatchDelete(t *testing.T) {
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	w, _ := s.NewWatcher("/foo", true, 0)
+	w, _ := s.Watch("/foo", true, 0)
 	s.Delete("/foo/bar", false, false)
 	s.Delete("/foo/bar", false, false)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Action, "delete", "")
@@ -514,7 +514,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", false, "bar", false, Permanent)
 	s.Create("/foo", false, "bar", false, Permanent)
-	w, _ := s.NewWatcher("/foo", false, 0)
+	w, _ := s.Watch("/foo", false, 0)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
@@ -525,7 +525,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo/bar", false, "baz", false, Permanent)
 	s.Create("/foo/bar", false, "baz", false, Permanent)
-	w, _ := s.NewWatcher("/foo", true, 0)
+	w, _ := s.Watch("/foo", true, 0)
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	e := nbselect(w.EventChan)
 	e := nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
@@ -545,7 +545,7 @@ func TestStoreWatchExpire(t *testing.T) {
 	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 
 
-	w, _ := s.NewWatcher("/", true, 0)
+	w, _ := s.Watch("/", true, 0)
 	c := w.EventChan
 	c := w.EventChan
 	e := nbselect(c)
 	e := nbselect(c)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
@@ -553,7 +553,7 @@ func TestStoreWatchExpire(t *testing.T) {
 	e = nbselect(c)
 	e = nbselect(c)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
-	w, _ = s.NewWatcher("/", true, 4)
+	w, _ = s.Watch("/", true, 4)
 	e = nbselect(w.EventChan)
 	e = nbselect(w.EventChan)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Node.Key, "/foofoo", "")
 	assert.Equal(t, e.Node.Key, "/foofoo", "")

+ 12 - 1
store/watcher.go

@@ -20,7 +20,7 @@ type Watcher struct {
 	EventChan  chan *Event
 	EventChan  chan *Event
 	recursive  bool
 	recursive  bool
 	sinceIndex uint64
 	sinceIndex uint64
-	Remove     func()
+	remove     func()
 }
 }
 
 
 // notify function notifies the watcher. If the watcher interests in the given path,
 // notify function notifies the watcher. If the watcher interests in the given path,
@@ -47,3 +47,14 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+// Remove removes the watcher from watcherHub
+func (w *Watcher) Remove() {
+	if w.remove != nil {
+		w.remove()
+	} else {
+		// We attached a remove function to watcher
+		// Other pkg cannot change it, so this should not happen
+		panic("missing Watcher remove function")
+	}
+}

+ 3 - 3
store/watcher_hub.go

@@ -34,11 +34,11 @@ func newWatchHub(capacity int) *watcherHub {
 	}
 	}
 }
 }
 
 
-// newWatcher function returns a watcher.
+// Watch function returns a watcher.
 // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
 // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
 // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
 // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
 // If index is zero, watch will start from the current index + 1.
 // If index is zero, watch will start from the current index + 1.
-func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) {
+func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) {
 	event, err := wh.EventHistory.scan(key, recursive, index)
 	event, err := wh.EventHistory.scan(key, recursive, index)
 
 
 	if err != nil {
 	if err != nil {
@@ -72,7 +72,7 @@ func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Wat
 		wh.watchers[key] = l
 		wh.watchers[key] = l
 	}
 	}
 
 
-	w.Remove = func() {
+	w.remove = func() {
 		wh.mutex.Lock()
 		wh.mutex.Lock()
 		defer wh.mutex.Unlock()
 		defer wh.mutex.Unlock()
 		l.Remove(elem)
 		l.Remove(elem)

+ 3 - 3
store/watcher_test.go

@@ -23,7 +23,7 @@ import (
 func TestWatcher(t *testing.T) {
 func TestWatcher(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	wh := s.WatcherHub
 	wh := s.WatcherHub
-	w, err := wh.newWatcher("/foo", true, 1)
+	w, err := wh.watch("/foo", true, 1)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("%v", err)
 		t.Fatalf("%v", err)
 	}
 	}
@@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) {
 		t.Fatal("recv != send")
 		t.Fatal("recv != send")
 	}
 	}
 
 
-	w, _ = wh.newWatcher("/foo", false, 2)
+	w, _ = wh.watch("/foo", false, 2)
 	c = w.EventChan
 	c = w.EventChan
 
 
 	e = newEvent(Create, "/foo/bar", 2, 2)
 	e = newEvent(Create, "/foo/bar", 2, 2)
@@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) {
 	}
 	}
 
 
 	// ensure we are doing exact matching rather than prefix matching
 	// ensure we are doing exact matching rather than prefix matching
-	w, _ = wh.newWatcher("/fo", true, 1)
+	w, _ = wh.watch("/fo", true, 1)
 	c = w.EventChan
 	c = w.EventChan
 
 
 	select {
 	select {