Browse Source

add some basic stats, and test case

evan-gu 12 years ago
parent
commit
a568c6dc75

+ 1 - 0
etcd_handlers.go

@@ -224,6 +224,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	w.WriteHeader(http.StatusOK)
 	//w.Write(etcdStore.Stats())
+	w.Write(etcdFs.Stats.GetStats())
 	w.Write(r.Stats())
 	return nil
 }

+ 1 - 1
file_system/event.go

@@ -6,7 +6,7 @@ import (
 	"sync"
 	"time"
 
-	etcdErr "github.com/coreos/etcd/error"
+	etcdErr "github.com/xiangli-cmu/etcd/error"
 )
 
 const (

+ 35 - 7
file_system/file_system.go

@@ -8,7 +8,7 @@ import (
 	"strings"
 	"time"
 
-	etcdErr "github.com/coreos/etcd/error"
+	etcdErr "github.com/xiangli-cmu/etcd/error"
 )
 
 type FileSystem struct {
@@ -16,13 +16,16 @@ type FileSystem struct {
 	WatcherHub *watcherHub
 	Index      uint64
 	Term       uint64
+	Stats      *EtcdStats
 }
 
 func New() *FileSystem {
-	return &FileSystem{
-		Root:       newDir("/", 0, 0, nil, "", Permanent),
-		WatcherHub: newWatchHub(1000),
-	}
+	fs := new(FileSystem)
+	fs.Root = newDir("/", 0, 0, nil, "", Permanent)
+	fs.Stats = newStats()
+	fs.WatcherHub = newWatchHub(1000, fs.Stats)
+
+	return fs
 
 }
 
@@ -32,6 +35,7 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64,
 	n, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsGetsMiss)
 		return nil, err
 	}
 
@@ -78,6 +82,7 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64,
 		e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
 	}
 
+	fs.Stats.IncStats(StatsGetsHit)
 	return e, nil
 }
 
@@ -91,12 +96,14 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 	_, err := fs.InternalGet(nodePath, index, term)
 
 	if err == nil { // key already exists
+		fs.Stats.IncStats(StatsSetsMiss)
 		return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath)
 	}
 
 	etcdError, _ := err.(etcdErr.Error)
 
 	if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
+		fs.Stats.IncStats(StatsSetsMiss)
 		return nil, err
 	}
 
@@ -106,6 +113,7 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 	d, err := fs.walk(dir, fs.checkDir)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsSetsMiss)
 		return nil, err
 	}
 
@@ -128,6 +136,7 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 	err = d.Add(n)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsSetsMiss)
 		return nil, err
 	}
 
@@ -139,6 +148,7 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 	}
 
 	fs.WatcherHub.notify(e)
+	fs.Stats.IncStats(StatsSetsHit)
 	return e, nil
 }
 
@@ -149,6 +159,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 	n, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil { // if the node does not exist, return error
+		fs.Stats.IncStats(StatsUpdatesMiss)
 		return nil, err
 	}
 
@@ -157,6 +168,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 	if n.IsDir() { // if the node is a directory, we can only update ttl
 
 		if len(value) != 0 {
+			fs.Stats.IncStats(StatsUpdatesMiss)
 			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 		}
 
@@ -183,6 +195,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 	}
 
 	fs.WatcherHub.notify(e)
+	fs.Stats.IncStats(StatsUpdatesHit)
 	return e, nil
 }
 
@@ -192,11 +205,12 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui
 	f, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil {
-
+		fs.Stats.IncStats(StatsTestAndSetsMiss)
 		return nil, err
 	}
 
 	if f.IsDir() { // can only test and set file
+		fs.Stats.IncStats(StatsTestAndSetsMiss)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 	}
 
@@ -208,11 +222,12 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui
 		f.Write(value, index, term)
 
 		fs.WatcherHub.notify(e)
-
+		fs.Stats.IncStats(StatsTestAndSetsHit)
 		return e, nil
 	}
 
 	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
+	fs.Stats.IncStats(StatsTestAndSetsMiss)
 	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
 }
 
@@ -222,6 +237,7 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term
 	n, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil { // if the node does not exist, return error
+		fs.Stats.IncStats(StatsDeletesMiss)
 		return nil, err
 	}
 
@@ -240,10 +256,12 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term
 	err = n.Remove(recursive, callback)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsDeletesMiss)
 		return nil, err
 	}
 
 	fs.WatcherHub.notify(e)
+	fs.Stats.IncStats(StatsDeletesHit)
 
 	return e, nil
 }
@@ -333,15 +351,23 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
 // Save function will not save the parent field of the node. Or there will
 // be cyclic dependencies issue for the json package.
 func (fs *FileSystem) Save() ([]byte, error) {
+
+	fs.Stats.IncStats(StatsSaveHit)
 	cloneFs := New()
 	cloneFs.Root = fs.Root.Clone()
 
 	b, err := json.Marshal(fs)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsSaveMiss)
+		fs.Stats.rwlock.Lock()
+		fs.Stats.SaveHit-- // restore the savehit
+		fs.Stats.rwlock.Unlock()
+
 		return nil, err
 	}
 
+	fs.Stats.IncStats(StatsSaveHit)
 	return b, nil
 }
 
@@ -353,9 +379,11 @@ func (fs *FileSystem) Recovery(state []byte) error {
 	err := json.Unmarshal(state, fs)
 
 	if err != nil {
+		fs.Stats.IncStats(StatsRecoveryMiss)
 		return err
 	}
 
 	fs.Root.recoverAndclean()
+	fs.Stats.IncStats(StatsRecoveryHit)
 	return nil
 }

+ 4 - 5
file_system/file_system_test.go

@@ -439,14 +439,12 @@ func TestSaveAndRecover(t *testing.T) {
 
 	expire := time.Now().Add(time.Second)
 	fs.Create("/foo/foo", "bar", expire, 1, 1)
-
-	b := fs.Save()
+	b, err := fs.Save()
 
 	cloneFs := New()
-
 	time.Sleep(time.Second)
 
-	cloneFs.Recover(b)
+	cloneFs.Recovery(b)
 
 	for i, k := range keys {
 		_, err := cloneFs.Get(k, false, false, uint64(i), 1)
@@ -466,7 +464,7 @@ func TestSaveAndRecover(t *testing.T) {
 		}
 	}
 
-	_, err := fs.Get("/foo/foo", false, false, 1, 1)
+	_, err = fs.Get("/foo/foo", false, false, 1, 1)
 
 	if err == nil || err.Error() != "Key Not Found" {
 		t.Fatalf("can get the node after deletion ")
@@ -476,6 +474,7 @@ func TestSaveAndRecover(t *testing.T) {
 
 // GenKeys randomly generate num of keys with max depth
 func GenKeys(num int, depth int) []string {
+	rand.Seed(time.Now().UnixNano())
 	keys := make([]string, num)
 	for i := 0; i < num; i++ {
 

+ 1 - 1
file_system/node.go

@@ -6,7 +6,7 @@ import (
 	"sync"
 	"time"
 
-	etcdErr "github.com/coreos/etcd/error"
+	etcdErr "github.com/xiangli-cmu/etcd/error"
 )
 
 var (

+ 146 - 0
file_system/stats.go

@@ -0,0 +1,146 @@
+package fileSystem
+
+import (
+	"encoding/json"
+	"sync"
+)
+
+const (
+	// Operations that will be running serializely
+	StatsSetsHit         = 100
+	StatsSetsMiss        = 101
+	StatsDeletesHit      = 102
+	StatsDeletesMiss     = 103
+	StatsUpdatesHit      = 104
+	StatsUpdatesMiss     = 105
+	StatsTestAndSetsHit  = 106
+	StatsTestAndSetsMiss = 107
+	StatsRecoveryHit     = 108
+	StatsRecoveryMiss    = 109
+
+	// concurrent operations
+	StatsGetsHit  = 200
+	StatsGetsMiss = 201
+
+	StatsWatchHit      = 300
+	StatsWatchMiss     = 301
+	StatsInWatchingNum = 302
+
+	StatsSaveHit  = 400
+	StatsSaveMiss = 401
+)
+
+type EtcdStats struct {
+
+	// Lock for synchronization
+	rwlock sync.RWMutex
+
+	// Number of get requests
+	GetsHit  uint64 `json:"gets_hits"`
+	GetsMiss uint64 `json:"gets_misses"`
+
+	// Number of sets requests
+	SetsHit  uint64 `json:"sets_hits"`
+	SetsMiss uint64 `json:"sets_misses"`
+
+	// Number of delete requests
+	DeletesHit  uint64 `json:"deletes_hits"`
+	DeletesMiss uint64 `json:"deletes_misses"`
+
+	// Number of update requests
+	UpdatesHit  uint64 `json:"updates_hits"`
+	UpdatesMiss uint64 `json:"updates_misses"`
+
+	// Number of testAndSet requests
+	TestAndSetsHit  uint64 `json:"testAndSets_hits"`
+	TestAndSetsMiss uint64 `json:"testAndSets_misses"`
+
+	// Number of Watch requests
+	WatchHit      uint64 `json:"watch_hit"`
+	WatchMiss     uint64 `json:"watch_miss"`
+	InWatchingNum uint64 `json:"in_watching_number"`
+
+	// Number of save requests
+	SaveHit  uint64 `json:"save_hit"`
+	SaveMiss uint64 `json:"save_miss"`
+
+	// Number of recovery requests
+	RecoveryHit  uint64 `json:"recovery_hit"`
+	RecoveryMiss uint64 `json:"recovery_miss"`
+}
+
+func newStats() *EtcdStats {
+	e := new(EtcdStats)
+	return e
+}
+
+// Status() return the statistics info of etcd storage its recent start
+func (e *EtcdStats) GetStats() []byte {
+	b, _ := json.Marshal(e)
+	return b
+}
+
+func (e *EtcdStats) TotalReads() uint64 {
+	return e.GetsHit + e.GetsMiss
+}
+
+func (e *EtcdStats) TotalWrites() uint64 {
+	return e.SetsHit + e.SetsMiss +
+		e.DeletesHit + e.DeletesMiss +
+		e.UpdatesHit + e.UpdatesMiss +
+		e.TestAndSetsHit + e.TestAndSetsMiss
+}
+
+func (e *EtcdStats) IncStats(field int) {
+	if field >= 200 {
+		e.rwlock.Lock()
+
+		switch field {
+		case StatsGetsHit:
+			e.GetsHit++
+		case StatsGetsMiss:
+			e.GetsMiss++
+		case StatsWatchHit:
+			e.WatchHit++
+		case StatsWatchMiss:
+			e.WatchMiss++
+		case StatsInWatchingNum:
+			e.InWatchingNum++
+		case StatsSaveHit:
+			e.SaveHit++
+		case StatsSaveMiss:
+			e.SaveMiss++
+		}
+
+		e.rwlock.Unlock()
+
+	} else {
+		e.rwlock.RLock()
+
+		switch field {
+		case StatsSetsHit:
+			e.SetsHit++
+		case StatsSetsMiss:
+			e.SetsMiss++
+		case StatsDeletesHit:
+			e.DeletesHit++
+		case StatsDeletesMiss:
+			e.DeletesMiss++
+		case StatsUpdatesHit:
+			e.UpdatesHit++
+		case StatsUpdatesMiss:
+			e.UpdatesMiss++
+		case StatsTestAndSetsHit:
+			e.TestAndSetsHit++
+		case StatsTestAndSetsMiss:
+			e.TestAndSetsMiss++
+		case StatsRecoveryHit:
+			e.RecoveryHit++
+		case StatsRecoveryMiss:
+			e.RecoveryMiss++
+		}
+
+		e.rwlock.RUnlock()
+	}
+
+}

+ 221 - 0
file_system/stats_test.go

@@ -0,0 +1,221 @@
+package fileSystem
+
+import (
+	"math/rand"
+	"testing"
+	"time"
+	//"fmt"
+)
+
+func TestBasicStats(t *testing.T) {
+	fs := New()
+	keys := GenKeys(rand.Intn(100), 5)
+
+	i := uint64(0)
+	GetsHit := uint64(0)
+	GetsMiss := uint64(0)
+	SetsHit := uint64(0)
+	SetsMiss := uint64(0)
+	DeletesHit := uint64(0)
+	DeletesMiss := uint64(0)
+	UpdatesHit := uint64(0)
+	UpdatesMiss := uint64(0)
+	TestAndSetsHit := uint64(0)
+	TestAndSetsMiss := uint64(0)
+	WatchHit := uint64(0)
+	WatchMiss := uint64(0)
+	InWatchingNum := uint64(0)
+	SaveHit := uint64(0)
+	SaveMiss := uint64(0)
+	RecoveryHit := uint64(0)
+	RecoveryMiss := uint64(0)
+
+	for _, k := range keys {
+		i++
+		_, err := fs.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1)
+		if err != nil {
+			SetsMiss++
+		} else {
+			SetsHit++
+		}
+	}
+
+	for _, k := range keys {
+		_, err := fs.Get(k, false, false, i, 1)
+		if err != nil {
+			GetsMiss++
+		} else {
+			GetsHit++
+		}
+	}
+
+	for _, k := range keys {
+		i++
+		_, err := fs.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1)
+		if err != nil {
+			UpdatesMiss++
+		} else {
+			UpdatesHit++
+		}
+	}
+
+	for _, k := range keys {
+		_, err := fs.Get(k, false, false, i, 1)
+		if err != nil {
+			GetsMiss++
+		} else {
+			GetsHit++
+		}
+	}
+
+	for _, k := range keys {
+		i++
+		_, err := fs.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1)
+		if err != nil {
+			TestAndSetsMiss++
+		} else {
+			TestAndSetsHit++
+		}
+	}
+
+	//fmt.Printf("#TestAndSet [%d]\n", TestAndSetsHit)
+
+	for _, k := range keys {
+		_, err := fs.Watch(k, false, 0, i, 1)
+		if err != nil {
+			WatchMiss++
+		} else {
+			WatchHit++
+			InWatchingNum++
+		}
+	}
+
+	//fmt.Printf("#Watch [%d]\n", WatchHit)
+
+	for _, k := range keys {
+		_, err := fs.Get(k, false, false, i, 1)
+		if err != nil {
+			GetsMiss++
+		} else {
+			GetsHit++
+		}
+	}
+
+	//fmt.Println("fs.index ", fs.Index)
+	for j := 0; j < 5; j++ {
+		b := make([]byte, 10)
+		err := fs.Recovery(b)
+		if err != nil {
+			RecoveryMiss++
+		}
+
+		b, err = fs.Save()
+		if err != nil {
+			SaveMiss++
+		} else {
+			SaveHit++
+		}
+
+		err = fs.Recovery(b)
+		if err != nil {
+			RecoveryMiss++
+		} else {
+			RecoveryHit++
+		}
+	}
+	//fmt.Println("fs.index after ", fs.Index)
+	//fmt.Println("stats.inwatching ", fs.Stats.InWatchingNum)
+
+	for _, k := range keys {
+		i++
+		_, err := fs.Delete(k, false, i, 1)
+		if err != nil {
+			DeletesMiss++
+		} else {
+			InWatchingNum--
+			DeletesHit++
+		}
+	}
+
+	//fmt.Printf("#Delete [%d] stats.deletehit [%d] \n", DeletesHit, fs.Stats.DeletesHit)
+
+	for _, k := range keys {
+		_, err := fs.Get(k, false, false, i, 1)
+		if err != nil {
+			GetsMiss++
+		} else {
+			GetsHit++
+		}
+	}
+
+	if GetsHit != fs.Stats.GetsHit {
+		t.Fatalf("GetsHit [%d] != Stats.GetsHit [%d]", GetsHit, fs.Stats.GetsHit)
+	}
+
+	if GetsMiss != fs.Stats.GetsMiss {
+		t.Fatalf("GetsMiss [%d] != Stats.GetsMiss [%d]", GetsMiss, fs.Stats.GetsMiss)
+	}
+
+	if SetsHit != fs.Stats.SetsHit {
+		t.Fatalf("SetsHit [%d] != Stats.SetsHit [%d]", SetsHit, fs.Stats.SetsHit)
+	}
+
+	if SetsMiss != fs.Stats.SetsMiss {
+		t.Fatalf("SetsMiss [%d] != Stats.SetsMiss [%d]", SetsMiss, fs.Stats.SetsMiss)
+	}
+
+	if DeletesHit != fs.Stats.DeletesHit {
+		t.Fatalf("DeletesHit [%d] != Stats.DeletesHit [%d]", DeletesHit, fs.Stats.DeletesHit)
+	}
+
+	if DeletesMiss != fs.Stats.DeletesMiss {
+		t.Fatalf("DeletesMiss [%d] != Stats.DeletesMiss [%d]", DeletesMiss, fs.Stats.DeletesMiss)
+	}
+
+	if UpdatesHit != fs.Stats.UpdatesHit {
+		t.Fatalf("UpdatesHit [%d] != Stats.UpdatesHit [%d]", UpdatesHit, fs.Stats.UpdatesHit)
+	}
+
+	if UpdatesMiss != fs.Stats.UpdatesMiss {
+		t.Fatalf("UpdatesMiss [%d] != Stats.UpdatesMiss [%d]", UpdatesMiss, fs.Stats.UpdatesMiss)
+	}
+
+	if TestAndSetsHit != fs.Stats.TestAndSetsHit {
+		t.Fatalf("TestAndSetsHit [%d] != Stats.TestAndSetsHit [%d]", TestAndSetsHit, fs.Stats.TestAndSetsHit)
+	}
+
+	if TestAndSetsMiss != fs.Stats.TestAndSetsMiss {
+		t.Fatalf("TestAndSetsMiss [%d] != Stats.TestAndSetsMiss [%d]", TestAndSetsMiss, fs.Stats.TestAndSetsMiss)
+	}
+
+	if SaveHit != fs.Stats.SaveHit {
+		t.Fatalf("SaveHit [%d] != Stats.SaveHit [%d]", SaveHit, fs.Stats.SaveHit)
+	}
+
+	if SaveMiss != fs.Stats.SaveMiss {
+		t.Fatalf("SaveMiss [%d] != Stats.SaveMiss [%d]", SaveMiss, fs.Stats.SaveMiss)
+	}
+
+	if WatchHit != fs.Stats.WatchHit {
+		t.Fatalf("WatchHit [%d] != Stats.WatchHit [%d]", WatchHit, fs.Stats.WatchHit)
+	}
+
+	if WatchMiss != fs.Stats.WatchMiss {
+		t.Fatalf("WatchMiss [%d] != Stats.WatchMiss [%d]", WatchMiss, fs.Stats.WatchMiss)
+	}
+
+	if InWatchingNum != fs.Stats.InWatchingNum {
+		t.Fatalf("InWatchingNum [%d] != Stats.InWatchingNum [%d]", InWatchingNum, fs.Stats.InWatchingNum)
+	}
+
+	if RecoveryHit != fs.Stats.RecoveryHit {
+		t.Fatalf("RecoveryHit [%d] != Stats.RecoveryHit [%d]", RecoveryHit, fs.Stats.RecoveryHit)
+	}
+
+	if RecoveryMiss != fs.Stats.RecoveryMiss {
+		t.Fatalf("RecoveryMiss [%d] != Stats.RecoveryMiss [%d]", RecoveryMiss, fs.Stats.RecoveryMiss)
+	}
+
+	//fmt.Println(GetsHit, GetsMiss, SetsHit, SetsMiss, DeletesHit, DeletesMiss, UpdatesHit, UpdatesMiss, TestAndSetsHit, TestAndSetsMiss, WatchHit, WatchMiss, InWatchingNum, SaveHit, SaveMiss, RecoveryHit, RecoveryMiss)
+
+}

+ 10 - 2
file_system/watcher.go

@@ -10,6 +10,7 @@ type watcherHub struct {
 	watchers     map[string]*list.List
 	count        uint64 // current number of watchers
 	EventHistory *EventHistory
+	Stats        *EtcdStats
 }
 
 type watcher struct {
@@ -18,10 +19,11 @@ type watcher struct {
 	sinceIndex uint64
 }
 
-func newWatchHub(capacity int) *watcherHub {
+func newWatchHub(capacity int, stats *EtcdStats) *watcherHub {
 	return &watcherHub{
 		watchers:     make(map[string]*list.List),
 		EventHistory: newEventHistory(capacity),
+		Stats:        stats,
 	}
 }
 
@@ -35,9 +37,13 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
 	e, err := wh.EventHistory.scan(prefix, index)
 
 	if err != nil {
+		wh.Stats.IncStats(StatsWatchMiss)
 		return nil, err
 	}
 
+	wh.Stats.IncStats(StatsWatchHit)
+	wh.Stats.IncStats(StatsInWatchingNum)
+
 	if e != nil {
 		eventChan <- e
 		return eventChan, nil
@@ -67,7 +73,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 	l, ok := wh.watchers[path]
 
 	if ok {
-
 		curr := l.Front()
 		notifiedAll := true
 
@@ -89,6 +94,9 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 
 			if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
 				w.eventChan <- e
+				wh.Stats.rwlock.Lock() // lock the InWatchingNum
+				wh.Stats.InWatchingNum--
+				wh.Stats.rwlock.Unlock()
 				l.Remove(curr)
 			} else {
 				notifiedAll = false

+ 2 - 1
file_system/watcher_test.go

@@ -5,7 +5,8 @@ import (
 )
 
 func TestWatcher(t *testing.T) {
-	wh := newWatchHub(100)
+	fs := New()
+	wh := fs.WatcherHub
 	c, err := wh.watch("/foo", true, 0)
 
 	if err != nil {