Browse Source

add ExpireCount and some test case

evan-gu 12 years ago
parent
commit
3ae316ac38
4 changed files with 45 additions and 14 deletions
  1. 8 7
      store/node.go
  2. 5 1
      store/stats.go
  3. 28 2
      store/stats_test.go
  4. 4 4
      store/store.go

+ 8 - 7
store/node.go

@@ -236,20 +236,20 @@ func (n *Node) Clone() *Node {
 	return clone
 }
 
-func (n *Node) recoverAndclean(WatcherHub *watcherHub) {
+func (n *Node) recoverAndclean(s *Store) {
 	if n.IsDir() {
 		for _, child := range n.Children {
 			child.Parent = n
-			child.recoverAndclean(WatcherHub)
+			child.recoverAndclean(s)
 		}
 	}
 
 	n.stopExpire = make(chan bool, 1)
 
-	n.Expire(WatcherHub)
+	n.Expire(s)
 }
 
-func (n *Node) Expire(WatcherHub *watcherHub) {
+func (n *Node) Expire(s *Store) {
 	expired, duration := n.IsExpired()
 
 	if expired { // has been expired
@@ -267,7 +267,8 @@ func (n *Node) Expire(WatcherHub *watcherHub) {
 		case <-time.After(duration):
 			n.Remove(true, nil)
 
-			WatcherHub.notifyWithoutIndex(Expire, n.Path)
+			s.Stats.Inc(ExpireCount)
+			s.WatcherHub.notifyWithoutIndex(Expire, n.Path)
 
 			return
 
@@ -358,13 +359,13 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 	}
 }
 
-func (n *Node) UpdateTTL(expireTime time.Time, WatcherHub *watcherHub) {
+func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
 	if !n.IsPermanent() {
 		n.stopExpire <- true // suspend it to modify the expiration
 	}
 
 	if expireTime.Sub(Permanent) != 0 {
 		n.ExpireTime = expireTime
-		n.Expire(WatcherHub)
+		n.Expire(s)
 	}
 }

+ 5 - 1
store/stats.go

@@ -16,6 +16,7 @@ const (
 	TestAndSetFail    = 107
 	GetSuccess        = 110
 	GetFail           = 111
+	ExpireCount       = 112
 )
 
 type Stats struct {
@@ -39,6 +40,7 @@ type Stats struct {
 	// Number of testAndSet requests
 	TestAndSetSuccess uint64 `json:"testAndSetSuccess"`
 	TestAndSetFail    uint64 `json:"testAndSetFail"`
+	ExpireCount       uint64 `json:"expireCount"`
 
 	Watchers uint64 `json:"watchers"`
 }
@@ -51,7 +53,7 @@ func newStats() *Stats {
 func (s *Stats) clone() *Stats {
 	return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
 		s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail,
-		s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers}
+		s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount}
 }
 
 // Status() return the statistics info of etcd storage its recent start
@@ -93,5 +95,7 @@ func (s *Stats) Inc(field int) {
 		atomic.AddUint64(&s.TestAndSetSuccess, 1)
 	case TestAndSetFail:
 		atomic.AddUint64(&s.TestAndSetFail, 1)
+	case ExpireCount:
+		atomic.AddUint64(&s.ExpireCount, 1)
 	}
 }

+ 28 - 2
store/stats_test.go

@@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1)
+		_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			SetFail++
 		} else {
@@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	time.Sleep(time.Second * 3)
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {
@@ -35,7 +37,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1)
+		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			UpdateFail++
 		} else {
@@ -43,6 +45,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	time.Sleep(time.Second * 3)
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {
@@ -136,4 +140,26 @@ func TestBasicStats(t *testing.T) {
 		t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail)
 	}
 
+	s = New()
+	SetSuccess = 0
+	SetFail = 0
+
+	for _, k := range keys {
+		i++
+		_, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1)
+		if err != nil {
+			SetFail++
+		} else {
+			SetSuccess++
+		}
+	}
+
+	time.Sleep(6 * time.Second)
+
+	ExpireCount := SetSuccess
+
+	if ExpireCount != s.Stats.ExpireCount {
+		t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount)
+	}
+
 }

+ 4 - 4
store/store.go

@@ -150,7 +150,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
 
 	// Node with TTL
 	if expireTime.Sub(Permanent) != 0 {
-		n.Expire(s.WatcherHub)
+		n.Expire(s)
 		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	}
@@ -194,7 +194,7 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 	}
 
 	// update ttl
-	n.UpdateTTL(expireTime, s.WatcherHub)
+	n.UpdateTTL(expireTime, s)
 
 	e.Expiration = &n.ExpireTime
 	e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
@@ -230,7 +230,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 		e.Value = value
 		n.Write(value, index, term)
 
-		n.UpdateTTL(expireTime, s.WatcherHub)
+		n.UpdateTTL(expireTime, s)
 
 		s.WatcherHub.notify(e)
 		s.Stats.Inc(TestAndSetSuccess)
@@ -401,7 +401,7 @@ func (s *Store) Recovery(state []byte) error {
 		return err
 	}
 
-	s.Root.recoverAndclean(s.WatcherHub)
+	s.Root.recoverAndclean(s)
 	return nil
 }