Browse Source

Merge pull request #15 from evan-gu/newStore

add watcher for expiration, add expiration for TestAndSet
Xiang Li 12 years ago
parent
commit
d2407dff9f
9 changed files with 191 additions and 91 deletions
  1. 27 7
      store/event.go
  2. 6 9
      store/event_test.go
  3. 45 8
      store/node.go
  4. 5 1
      store/stats.go
  5. 28 2
      store/stats_test.go
  6. 17 19
      store/store.go
  7. 52 30
      store/store_test.go
  8. 4 7
      store/watcher.go
  9. 7 8
      store/watcher_test.go

+ 27 - 7
store/event.go

@@ -15,6 +15,9 @@ const (
 	Update     = "update"
 	Delete     = "delete"
 	TestAndSet = "testAndSet"
+	Expire     = "expire"
+	UndefIndex = 0
+	UndefTerm  = 0
 )
 
 type Event struct {
@@ -73,7 +76,6 @@ func (eq *eventQueue) back() int {
 }
 
 func (eq *eventQueue) insert(e *Event) {
-
 	index := (eq.back() + 1) % eq.Capacity
 
 	eq.Events[index] = e
@@ -89,6 +91,9 @@ func (eq *eventQueue) insert(e *Event) {
 type EventHistory struct {
 	Queue      eventQueue
 	StartIndex uint64
+	LastIndex  uint64
+	LastTerm   uint64
+	DupCnt     uint64 // help to compute the watching point with duplicated indexes in the queue
 	rwl        sync.RWMutex
 }
 
@@ -102,13 +107,31 @@ func newEventHistory(capacity int) *EventHistory {
 }
 
 // addEvent function adds event into the eventHistory
-func (eh *EventHistory) addEvent(e *Event) {
+func (eh *EventHistory) addEvent(e *Event) *Event {
 	eh.rwl.Lock()
 	defer eh.rwl.Unlock()
 
+	var duped uint64
+
+	if e.Index == UndefIndex {
+		e.Index = eh.LastIndex
+		duped = 1
+	}
+
+	if e.Term == UndefTerm {
+		e.Term = eh.LastTerm
+		duped = 1
+	}
+
 	eh.Queue.insert(e)
 
 	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
+
+	eh.LastIndex = e.Index
+	eh.LastTerm = e.Term
+	eh.DupCnt += duped
+
+	return e
 }
 
 // scan function is enumerating events from the index in history and
@@ -117,7 +140,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	start := index - eh.StartIndex
+	start := index - eh.StartIndex + eh.DupCnt
 
 	// the index should locate after the event history's StartIndex
 	// and before its size
@@ -126,8 +149,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 		// TODO: Add error type
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
-				fmt.Sprintf("prefix:%v index:%v", prefix, index),
-			)
+				fmt.Sprintf("prefix:%v index:%v", prefix, index))
 	}
 
 	if start >= uint64(eh.Queue.Size) {
@@ -149,13 +171,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 			return nil, nil
 		}
 	}
-
 }
 
 // clone will be protected by a stop-world lock
 // do not need to obtain internal lock
 func (eh *EventHistory) clone() *EventHistory {
-
 	clonedQueue := eventQueue{
 		Capacity: eh.Queue.Capacity,
 		Events:   make([]*Event, eh.Queue.Capacity),

+ 6 - 9
store/event_test.go

@@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) {
 
 	// Add
 	for i := 0; i < 200; i++ {
-		e := newEvent(Create, "/foo", uint64(i), 0)
+		e := newEvent(Create, "/foo", uint64(i), 1)
 		eh.addEvent(e)
 	}
 
@@ -28,20 +28,18 @@ func TestEventQueue(t *testing.T) {
 		}
 		j++
 		i = (i + 1) % eh.Queue.Capacity
-
 	}
-
 }
 
 func TestScanHistory(t *testing.T) {
 	eh := newEventHistory(100)
 
 	// Add
-	eh.addEvent(newEvent(Create, "/foo", 1, 0))
-	eh.addEvent(newEvent(Create, "/foo/bar", 2, 0))
-	eh.addEvent(newEvent(Create, "/foo/foo", 3, 0))
-	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0))
-	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0))
+	eh.addEvent(newEvent(Create, "/foo", 1, 1))
+	eh.addEvent(newEvent(Create, "/foo/bar", 2, 1))
+	eh.addEvent(newEvent(Create, "/foo/foo", 3, 1))
+	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1))
+	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1))
 
 	e, err := eh.scan("/foo", 1)
 	if err != nil || e.Index != 1 {
@@ -65,5 +63,4 @@ func TestScanHistory(t *testing.T) {
 	if e != nil {
 		t.Fatalf("bad index shoud reuturn nil")
 	}
-
 }

+ 45 - 8
store/node.go

@@ -87,6 +87,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error {
 
 			n.stopExpire <- true
 			n.status = removed
+
 		}
 
 		return nil
@@ -185,7 +186,6 @@ func (n *Node) GetFile(name string) (*Node, error) {
 	}
 
 	return nil, nil
-
 }
 
 // Add function adds a node to the receiver node.
@@ -214,7 +214,6 @@ func (n *Node) Add(child *Node) error {
 	n.Children[name] = child
 
 	return nil
-
 }
 
 // Clone function clone the node recursively and return the new node.
@@ -236,24 +235,36 @@ func (n *Node) Clone() *Node {
 	return clone
 }
 
-func (n *Node) recoverAndclean() {
+func (n *Node) recoverAndclean(s *Store) {
 	if n.IsDir() {
 		for _, child := range n.Children {
 			child.Parent = n
-			child.recoverAndclean()
+			child.recoverAndclean(s)
 		}
 	}
 
 	n.stopExpire = make(chan bool, 1)
 
-	n.Expire()
+	n.Expire(s)
 }
 
-func (n *Node) Expire() {
+// Expire function will test if the node is expired.
+// if the node is already expired, delete the node and return.
+// if the node is permemant (this shouldn't happen), return at once.
+// else wait for a period time, then remove the node. and notify the watchhub.
+func (n *Node) Expire(s *Store) {
 	expired, duration := n.IsExpired()
 
 	if expired { // has been expired
+
+		// since the parent function of Expire() runs serially,
+		// there is no need for lock here
+		e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
+		s.WatcherHub.notify(e)
+
 		n.Remove(true, nil)
+		s.Stats.Inc(ExpireCount)
+
 		return
 	}
 
@@ -265,13 +276,24 @@ func (n *Node) Expire() {
 		select {
 		// if timeout, delete the node
 		case <-time.After(duration):
+
+			// Lock the worldLock to avoid race on s.WatchHub,
+			// and the race with other slibling nodes on their common parent.
+			s.worldLock.Lock()
+
+			e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
+			s.WatcherHub.notify(e)
+
 			n.Remove(true, nil)
+			s.Stats.Inc(ExpireCount)
+
+			s.worldLock.Unlock()
+
 			return
 
 		// if stopped, return
 		case <-n.stopExpire:
 			return
-
 		}
 	}()
 }
@@ -285,7 +307,6 @@ func (n *Node) IsHidden() bool {
 	_, name := path.Split(n.Path)
 
 	return name[0] == '_'
-
 }
 
 func (n *Node) IsPermanent() bool {
@@ -346,6 +367,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		if sorted {
 			sort.Sort(pair)
 		}
+
 		return pair
 	}
 
@@ -354,3 +376,18 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		Value: n.Value,
 	}
 }
+
+func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
+	if !n.IsPermanent() {
+		expired, _ := n.IsExpired()
+
+		if !expired {
+			n.stopExpire <- true // suspend it to modify the expiration
+		}
+	}
+
+	if expireTime.Sub(Permanent) != 0 {
+		n.ExpireTime = expireTime
+		n.Expire(s)
+	}
+}

+ 5 - 1
store/stats.go

@@ -16,6 +16,7 @@ const (
 	TestAndSetFail    = 107
 	GetSuccess        = 110
 	GetFail           = 111
+	ExpireCount       = 112
 )
 
 type Stats struct {
@@ -39,6 +40,7 @@ type Stats struct {
 	// Number of testAndSet requests
 	TestAndSetSuccess uint64 `json:"testAndSetSuccess"`
 	TestAndSetFail    uint64 `json:"testAndSetFail"`
+	ExpireCount       uint64 `json:"expireCount"`
 
 	Watchers uint64 `json:"watchers"`
 }
@@ -51,7 +53,7 @@ func newStats() *Stats {
 func (s *Stats) clone() *Stats {
 	return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
 		s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail,
-		s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers}
+		s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount}
 }
 
 // Status() return the statistics info of etcd storage its recent start
@@ -93,5 +95,7 @@ func (s *Stats) Inc(field int) {
 		atomic.AddUint64(&s.TestAndSetSuccess, 1)
 	case TestAndSetFail:
 		atomic.AddUint64(&s.TestAndSetFail, 1)
+	case ExpireCount:
+		atomic.AddUint64(&s.ExpireCount, 1)
 	}
 }

+ 28 - 2
store/stats_test.go

@@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1)
+		_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			SetFail++
 		} else {
@@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	time.Sleep(time.Second * 3)
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {
@@ -35,7 +37,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1)
+		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			UpdateFail++
 		} else {
@@ -43,6 +45,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	time.Sleep(time.Second * 3)
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {
@@ -136,4 +140,26 @@ func TestBasicStats(t *testing.T) {
 		t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail)
 	}
 
+	s = New()
+	SetSuccess = 0
+	SetFail = 0
+
+	for _, k := range keys {
+		i++
+		_, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1)
+		if err != nil {
+			SetFail++
+		} else {
+			SetSuccess++
+		}
+	}
+
+	time.Sleep(6 * time.Second)
+
+	ExpireCount := SetSuccess
+
+	if ExpireCount != s.Stats.ExpireCount {
+		t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount)
+	}
+
 }

+ 17 - 19
store/store.go

@@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
 
 	// Node with TTL
 	if expireTime.Sub(Permanent) != 0 {
-		n.Expire()
+		n.Expire(s)
 		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	}
@@ -171,15 +171,16 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 
 	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(UpdateFail)
+
 		return nil, err
 	}
 
 	e := newEvent(Update, nodePath, s.Index, s.Term)
 
 	if n.IsDir() { // if the node is a directory, we can only update ttl
-
 		if len(value) != 0 {
 			s.Stats.Inc(UpdateFail)
+
 			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 		}
 
@@ -194,19 +195,14 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 	}
 
 	// update ttl
-	if !n.IsPermanent() {
-		n.stopExpire <- true
-	}
-
-	if expireTime.Sub(Permanent) != 0 {
-		n.ExpireTime = expireTime
-		n.Expire()
-		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
-	}
+	n.UpdateTTL(expireTime, s)
 
+	e.Expiration = &n.ExpireTime
+	e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	s.WatcherHub.notify(e)
+
 	s.Stats.Inc(UpdateSuccess)
+
 	return e, nil
 }
 
@@ -216,31 +212,33 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
-	f, err := s.internalGet(nodePath, index, term)
+	n, err := s.internalGet(nodePath, index, term)
 
 	if err != nil {
 		s.Stats.Inc(TestAndSetFail)
 		return nil, err
 	}
 
-	if f.IsDir() { // can only test and set file
+	if n.IsDir() { // can only test and set file
 		s.Stats.Inc(TestAndSetFail)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 	}
 
-	if f.Value == prevValue || f.ModifiedIndex == prevIndex {
+	if n.Value == prevValue || n.ModifiedIndex == prevIndex {
 		// if test succeed, write the value
 		e := newEvent(TestAndSet, nodePath, index, term)
-		e.PrevValue = f.Value
+		e.PrevValue = n.Value
 		e.Value = value
-		f.Write(value, index, term)
+		n.Write(value, index, term)
+
+		n.UpdateTTL(expireTime, s)
 
 		s.WatcherHub.notify(e)
 		s.Stats.Inc(TestAndSetSuccess)
 		return e, nil
 	}
 
-	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
+	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
 	s.Stats.Inc(TestAndSetFail)
 	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
 }
@@ -404,7 +402,7 @@ func (s *Store) Recovery(state []byte) error {
 		return err
 	}
 
-	s.Root.recoverAndclean()
+	s.Root.recoverAndclean(s)
 	return nil
 }
 

+ 52 - 30
store/store_test.go

@@ -52,7 +52,6 @@ func TestCreateAndGet(t *testing.T) {
 	if err != nil {
 		t.Fatal("Cannot create /fooDir/bar = bar")
 	}
-
 }
 
 func TestUpdateFile(t *testing.T) {
@@ -81,7 +80,6 @@ func TestUpdateFile(t *testing.T) {
 	}
 
 	// create a directory, update its ttl, to see if it will be deleted
-
 	_, err = s.Create("/foo/foo", "", Permanent, 3, 1)
 
 	if err != nil {
@@ -237,7 +235,6 @@ func TestRemove(t *testing.T) {
 	if err == nil || err.Error() != "Key Not Found" {
 		t.Fatalf("can get the node after deletion ")
 	}
-
 }
 
 func TestExpire(t *testing.T) {
@@ -280,7 +277,6 @@ func TestExpire(t *testing.T) {
 	if err != nil {
 		t.Fatalf("cannot delete the node before expiration", err.Error())
 	}
-
 }
 
 func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
@@ -314,70 +310,95 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 	if e.PrevValue != "car" || e.Value != "bar" {
 		t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar")
 	}
-
 }
 
 func TestWatch(t *testing.T) {
 	s := New()
 	// watch at a deeper path
-	c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
 	s.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
 
 	e := nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
-		t.Fatal("watch for Create node fails")
+	if e.Key != "/foo/foo/foo" || e.Action != Create {
+		t.Fatal("watch for Create node fails ", e)
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1)
 	s.Update("/foo/foo/foo", "car", Permanent, 2, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
-		t.Fatal("watch for Update node fails")
+	if e.Key != "/foo/foo/foo" || e.Action != Update {
+		t.Fatal("watch for Update node fails ", e)
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1)
 	s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
+	if e.Key != "/foo/foo/foo" || e.Action != TestAndSet {
 		t.Fatal("watch for TestAndSet node fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1)
 	s.Delete("/foo", true, 4, 1) //recursively delete
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo" {
-		t.Fatal("watch for Delete node fails")
+	if e.Key != "/foo" || e.Action != Delete {
+		t.Fatal("watch for Delete node fails ", e)
 	}
 
 	// watch at a prefix
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 4, 1)
 	s.Create("/foo/foo/boo", "bar", Permanent, 5, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Create {
 		t.Fatal("watch for Create subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 5, 1)
 	s.Update("/foo/foo/boo", "foo", Permanent, 6, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Update {
 		t.Fatal("watch for Update subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 6, 1)
 	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != TestAndSet {
 		t.Fatal("watch for TestAndSet subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 7, 1)
 	s.Delete("/foo/foo/boo", false, 8, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Delete {
 		t.Fatal("watch for Delete subdirectory fails")
 	}
 
+	// watch expire
+	s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1)
+	c, _ = s.Watch("/foo", true, 0, 9, 1)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 9 {
+		t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
+	}
+
+	s.Create("/foo/foo/boo", "foo", Permanent, 10, 1)
+	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
+	c, _ = s.Watch("/foo", true, 0, 11, 1)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 11 {
+		t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
+	}
+
+	s.Create("/foo/foo/boo", "foo", Permanent, 12, 1)
+	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
+	c, _ = s.Watch("/foo", true, 0, 13, 1)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 {
+		t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e)
+	}
 }
 
 func TestSort(t *testing.T) {
@@ -442,7 +463,7 @@ func TestSaveAndRecover(t *testing.T) {
 	b, err := s.Save()
 
 	cloneFs := New()
-	time.Sleep(time.Second)
+	time.Sleep(2 * time.Second)
 
 	cloneFs.Recovery(b)
 
@@ -453,11 +474,15 @@ func TestSaveAndRecover(t *testing.T) {
 		}
 	}
 
+	// lock to avoid racing with Expire()
+	s.worldLock.RLock()
+	defer s.worldLock.RUnlock()
+
 	if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
 		t.Fatal("Error recovered event history start index")
 	}
 
-	for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ {
+	for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
 		if s.WatcherHub.EventHistory.Queue.Events[i].Key !=
 			cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {
 			t.Fatal("Error recovered event history")
@@ -469,7 +494,6 @@ func TestSaveAndRecover(t *testing.T) {
 	if err == nil || err.Error() != "Key Not Found" {
 		t.Fatalf("can get the node after deletion ")
 	}
-
 }
 
 // GenKeys randomly generate num of keys with max depth
@@ -485,6 +509,7 @@ func GenKeys(num int, depth int) []string {
 			keys[i] += "/" + strconv.Itoa(rand.Int())
 		}
 	}
+
 	return keys
 }
 
@@ -504,11 +529,9 @@ func createAndGet(s *Store, path string, t *testing.T) {
 	if e.Value != "bar" {
 		t.Fatalf("expect value of %s is bar [%s]", path, e.Value)
 	}
-
 }
 
 func recursiveTestSort(k KeyValuePair, t *testing.T) {
-
 	for i, v := range k.KVPairs[:len(k.KVPairs)-1] {
 		if v.Key >= k.KVPairs[i+1].Key {
 			t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key)
@@ -517,7 +540,6 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) {
 		if v.Dir {
 			recursiveTestSort(v, t)
 		}
-
 	}
 
 	if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir {

+ 4 - 7
store/watcher.go

@@ -47,7 +47,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
 	w := &watcher{
 		eventChan:  eventChan,
 		recursive:  recursive,
-		sinceIndex: index,
+		sinceIndex: index - 1, // to catch Expire()
 	}
 
 	l, ok := wh.watchers[prefix]
@@ -74,21 +74,19 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 		notifiedAll := true
 
 		for {
-
 			if curr == nil { // we have reached the end of the list
-
 				if notifiedAll {
 					// if we have notified all watcher in the list
 					// we can delete the list
 					delete(wh.watchers, path)
 				}
+
 				break
 			}
 
 			next := curr.Next() // save the next
 
 			w, _ := curr.Value.(*watcher)
-
 			if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
 				w.eventChan <- e
 				l.Remove(curr)
@@ -98,12 +96,13 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 			}
 
 			curr = next // go to the next one
-
 		}
 	}
 }
 
 func (wh *watcherHub) notify(e *Event) {
+	e = wh.EventHistory.addEvent(e)
+
 	segments := strings.Split(e.Key, "/")
 
 	currPath := "/"
@@ -113,8 +112,6 @@ func (wh *watcherHub) notify(e *Event) {
 		currPath = path.Join(currPath, segment)
 		wh.notifyWithPath(e, currPath, false)
 	}
-
-	wh.EventHistory.addEvent(e)
 }
 
 func (wh *watcherHub) clone() *watcherHub {

+ 7 - 8
store/watcher_test.go

@@ -7,8 +7,7 @@ import (
 func TestWatcher(t *testing.T) {
 	s := New()
 	wh := s.WatcherHub
-	c, err := wh.watch("/foo", true, 0)
-
+	c, err := wh.watch("/foo", true, 1)
 	if err != nil {
 		t.Fatal("%v", err)
 	}
@@ -20,7 +19,7 @@ func TestWatcher(t *testing.T) {
 		// do nothing
 	}
 
-	e := newEvent(Create, "/foo/bar", 1, 0)
+	e := newEvent(Create, "/foo/bar", 1, 1)
 
 	wh.notify(e)
 
@@ -30,20 +29,20 @@ func TestWatcher(t *testing.T) {
 		t.Fatal("recv != send")
 	}
 
-	c, _ = wh.watch("/foo", false, 0)
+	c, _ = wh.watch("/foo", false, 2)
 
-	e = newEvent(Create, "/foo/bar", 1, 0)
+	e = newEvent(Create, "/foo/bar", 2, 1)
 
 	wh.notify(e)
 
 	select {
-	case <-c:
-		t.Fatal("should not receive from channel if not recursive")
+	case re = <-c:
+		t.Fatal("should not receive from channel if not recursive ", re)
 	default:
 		// do nothing
 	}
 
-	e = newEvent(Create, "/foo", 1, 0)
+	e = newEvent(Create, "/foo", 3, 1)
 
 	wh.notify(e)