Browse Source

add watcher for expiration, add expiration for TestAndSet, add related test case

evan-gu 12 years ago
parent
commit
35724319c9
5 changed files with 103 additions and 25 deletions
  1. 17 2
      store/event.go
  2. 18 4
      store/node.go
  3. 14 17
      store/store.go
  4. 39 2
      store/store_test.go
  5. 15 0
      store/watcher.go

+ 17 - 2
store/event.go

@@ -15,6 +15,7 @@ const (
 	Update     = "update"
 	Update     = "update"
 	Delete     = "delete"
 	Delete     = "delete"
 	TestAndSet = "testAndSet"
 	TestAndSet = "testAndSet"
+	Expire     = "expire"
 )
 )
 
 
 type Event struct {
 type Event struct {
@@ -111,6 +112,21 @@ func (eh *EventHistory) addEvent(e *Event) {
 	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
 	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
 }
 }
 
 
+// addEvent with the last event's index and term
+func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) {
+	eh.rwl.Lock()
+	defer eh.rwl.Unlock()
+
+	LastEvent := eh.Queue.Events[eh.Queue.back()]
+	e = newEvent(action, key, LastEvent.Index, LastEvent.Term)
+
+	eh.Queue.insert(e)
+
+	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
+
+	return e
+}
+
 // scan function is enumerating events from the index in history and
 // scan function is enumerating events from the index in history and
 // stops till the first point where the key has identified prefix
 // stops till the first point where the key has identified prefix
 func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
@@ -126,8 +142,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 		// TODO: Add error type
 		// TODO: Add error type
 		return nil,
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
-				fmt.Sprintf("prefix:%v index:%v", prefix, index),
-			)
+				fmt.Sprintf("prefix:%v index:%v", prefix, index))
 	}
 	}
 
 
 	if start >= uint64(eh.Queue.Size) {
 	if start >= uint64(eh.Queue.Size) {

+ 18 - 4
store/node.go

@@ -236,20 +236,20 @@ func (n *Node) Clone() *Node {
 	return clone
 	return clone
 }
 }
 
 
-func (n *Node) recoverAndclean() {
+func (n *Node) recoverAndclean(WatcherHub *watcherHub) {
 	if n.IsDir() {
 	if n.IsDir() {
 		for _, child := range n.Children {
 		for _, child := range n.Children {
 			child.Parent = n
 			child.Parent = n
-			child.recoverAndclean()
+			child.recoverAndclean(WatcherHub)
 		}
 		}
 	}
 	}
 
 
 	n.stopExpire = make(chan bool, 1)
 	n.stopExpire = make(chan bool, 1)
 
 
-	n.Expire()
+	n.Expire(WatcherHub)
 }
 }
 
 
-func (n *Node) Expire() {
+func (n *Node) Expire(WatcherHub *watcherHub) {
 	expired, duration := n.IsExpired()
 	expired, duration := n.IsExpired()
 
 
 	if expired { // has been expired
 	if expired { // has been expired
@@ -266,6 +266,9 @@ func (n *Node) Expire() {
 		// if timeout, delete the node
 		// if timeout, delete the node
 		case <-time.After(duration):
 		case <-time.After(duration):
 			n.Remove(true, nil)
 			n.Remove(true, nil)
+
+			WatcherHub.notifyWithoutIndex(Expire, n.Path)
+
 			return
 			return
 
 
 		// if stopped, return
 		// if stopped, return
@@ -354,3 +357,14 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		Value: n.Value,
 		Value: n.Value,
 	}
 	}
 }
 }
+
+func (n *Node) UpdateTTL(expireTime time.Time, WatcherHub *watcherHub) {
+	if !n.IsPermanent() {
+		n.stopExpire <- true // suspend it to modify the expiration
+	}
+
+	if expireTime.Sub(Permanent) != 0 {
+		n.ExpireTime = expireTime
+		n.Expire(WatcherHub)
+	}
+}

+ 14 - 17
store/store.go

@@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
 
 
 	// Node with TTL
 	// Node with TTL
 	if expireTime.Sub(Permanent) != 0 {
 	if expireTime.Sub(Permanent) != 0 {
-		n.Expire()
+		n.Expire(s.WatcherHub)
 		e.Expiration = &n.ExpireTime
 		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	}
 	}
@@ -194,19 +194,14 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 	}
 	}
 
 
 	// update ttl
 	// update ttl
-	if !n.IsPermanent() {
-		n.stopExpire <- true
-	}
+	n.UpdateTTL(expireTime, s.WatcherHub)
 
 
-	if expireTime.Sub(Permanent) != 0 {
-		n.ExpireTime = expireTime
-		n.Expire()
-		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
-	}
+	e.Expiration = &n.ExpireTime
+	e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
 	s.Stats.Inc(UpdateSuccess)
 	s.Stats.Inc(UpdateSuccess)
+
 	return e, nil
 	return e, nil
 }
 }
 
 
@@ -216,31 +211,33 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 	s.worldLock.RLock()
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 	defer s.worldLock.RUnlock()
 
 
-	f, err := s.internalGet(nodePath, index, term)
+	n, err := s.internalGet(nodePath, index, term)
 
 
 	if err != nil {
 	if err != nil {
 		s.Stats.Inc(TestAndSetFail)
 		s.Stats.Inc(TestAndSetFail)
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	if f.IsDir() { // can only test and set file
+	if n.IsDir() { // can only test and set file
 		s.Stats.Inc(TestAndSetFail)
 		s.Stats.Inc(TestAndSetFail)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 	}
 	}
 
 
-	if f.Value == prevValue || f.ModifiedIndex == prevIndex {
+	if n.Value == prevValue || n.ModifiedIndex == prevIndex {
 		// if test succeed, write the value
 		// if test succeed, write the value
 		e := newEvent(TestAndSet, nodePath, index, term)
 		e := newEvent(TestAndSet, nodePath, index, term)
-		e.PrevValue = f.Value
+		e.PrevValue = n.Value
 		e.Value = value
 		e.Value = value
-		f.Write(value, index, term)
+		n.Write(value, index, term)
+
+		n.UpdateTTL(expireTime, s.WatcherHub)
 
 
 		s.WatcherHub.notify(e)
 		s.WatcherHub.notify(e)
 		s.Stats.Inc(TestAndSetSuccess)
 		s.Stats.Inc(TestAndSetSuccess)
 		return e, nil
 		return e, nil
 	}
 	}
 
 
-	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
+	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
 	s.Stats.Inc(TestAndSetFail)
 	s.Stats.Inc(TestAndSetFail)
 	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
 	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
 }
 }
@@ -404,7 +401,7 @@ func (s *Store) Recovery(state []byte) error {
 		return err
 		return err
 	}
 	}
 
 
-	s.Root.recoverAndclean()
+	s.Root.recoverAndclean(s.WatcherHub)
 	return nil
 	return nil
 }
 }
 
 

+ 39 - 2
store/store_test.go

@@ -378,6 +378,33 @@ func TestWatch(t *testing.T) {
 		t.Fatal("watch for Delete subdirectory fails")
 		t.Fatal("watch for Delete subdirectory fails")
 	}
 	}
 
 
+	// watch expire
+	s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1)
+	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Index != 9 {
+		t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
+	}
+
+	s.Create("/foo/foo/boo", "foo", Permanent, 10, 1)
+	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
+	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Index != 11 {
+		t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
+	}
+
+	s.Create("/foo/foo/boo", "foo", Permanent, 12, 1)
+	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
+	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	time.Sleep(time.Second * 2)
+	e = nonblockingRetrive(c)
+	if e.Key != "/foo/foo/boo" || e.Index != 13 {
+		t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e)
+	}
+
 }
 }
 
 
 func TestSort(t *testing.T) {
 func TestSort(t *testing.T) {
@@ -442,7 +469,7 @@ func TestSaveAndRecover(t *testing.T) {
 	b, err := s.Save()
 	b, err := s.Save()
 
 
 	cloneFs := New()
 	cloneFs := New()
-	time.Sleep(time.Second)
+	time.Sleep(2 * time.Second)
 
 
 	cloneFs.Recovery(b)
 	cloneFs.Recovery(b)
 
 
@@ -457,7 +484,17 @@ func TestSaveAndRecover(t *testing.T) {
 		t.Fatal("Error recovered event history start index")
 		t.Fatal("Error recovered event history start index")
 	}
 	}
 
 
-	for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ {
+	//t.Log("watcherhub.size: ", s.WatcherHub.EventHistory.Queue.Size)
+	//for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ {
+	//	t.Log(s.WatcherHub.EventHistory.Queue.Events[i])
+	//}
+	//
+	//t.Log("ClonedWatcherhub.size: ", cloneFs.WatcherHub.EventHistory.Queue.Size)
+	//for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
+	//	t.Log(cloneFs.WatcherHub.EventHistory.Queue.Events[i])
+	//}
+
+	for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
 		if s.WatcherHub.EventHistory.Queue.Events[i].Key !=
 		if s.WatcherHub.EventHistory.Queue.Events[i].Key !=
 			cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {
 			cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {
 			t.Fatal("Error recovered event history")
 			t.Fatal("Error recovered event history")

+ 15 - 0
store/watcher.go

@@ -117,6 +117,21 @@ func (wh *watcherHub) notify(e *Event) {
 	wh.EventHistory.addEvent(e)
 	wh.EventHistory.addEvent(e)
 }
 }
 
 
+// notify with last event's index and term
+func (wh *watcherHub) notifyWithoutIndex(action, key string) {
+	e := wh.EventHistory.addEventWithouIndex(action, key)
+
+	segments := strings.Split(e.Key, "/")
+
+	currPath := "/"
+
+	// walk through all the paths
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+		wh.notifyWithPath(e, currPath, false)
+	}
+}
+
 func (wh *watcherHub) clone() *watcherHub {
 func (wh *watcherHub) clone() *watcherHub {
 	clonedHistory := wh.EventHistory.clone()
 	clonedHistory := wh.EventHistory.clone()