Browse Source

Merge pull request #54 from xiangli-cmu/master

Update
Xiang Li 12 years ago
parent
commit
b8c89a147b
10 changed files with 143 additions and 49 deletions
  1. 1 1
      client_handlers.go
  2. 5 1
      command.go
  3. 4 1
      error.go
  4. 1 1
      etcd.go
  5. 5 5
      store/keyword_test.go
  6. 9 4
      store/store.go
  7. 5 0
      store/tree.go
  8. 18 7
      store/watcher.go
  9. 0 29
      store/watcher_test.bak
  10. 95 0
      store/watcher_test.go

+ 1 - 1
client_handlers.go

@@ -300,8 +300,8 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 
 	if body, err := command.Apply(raftServer); err != nil {
-		warnf("Unable to do watch command: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
+		w.Write(newJsonError(500, key))
 	} else {
 		w.WriteHeader(http.StatusOK)
 

+ 5 - 1
command.go

@@ -93,7 +93,7 @@ func (c *WatchCommand) CommandName() string {
 
 func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 	// create a new watcher
-	watcher := store.CreateWatcher()
+	watcher := store.NewWatcher()
 
 	// add to the watchers list
 	etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex)
@@ -101,6 +101,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 	// wait for the notification for any changing
 	res := <-watcher.C
 
+	if res == nil {
+		return nil, fmt.Errorf("watcher is cleared")
+	}
+
 	return json.Marshal(res)
 }
 

+ 4 - 1
error.go

@@ -20,7 +20,7 @@ func init() {
 	errors[201] = "PrevValue is Required in POST form"
 	errors[202] = "The given TTL in POST form is not a number"
 	errors[203] = "The given index in POST form is not a number"
-	
+
 	// raft related errors
 	errors[300] = "Raft Internal Error"
 	errors[301] = "During Leader Election"
@@ -28,6 +28,9 @@ func init() {
 	// keyword
 	errors[400] = "The prefix of the given key is a keyword in etcd"
 
+	// etcd related errors
+	errors[500] = "watcher is cleared due to etcd recovery"
+
 }
 
 type jsonError struct {

+ 1 - 1
etcd.go

@@ -520,7 +520,7 @@ func getInfo(path string) *Info {
 
 		logPath := fmt.Sprintf("%s/log", path)
 		confPath := fmt.Sprintf("%s/conf", path)
-		snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
+		snapshotPath := fmt.Sprintf("%s/snapshot", path)
 		os.Remove(infoPath)
 		os.Remove(logPath)
 		os.Remove(confPath)

+ 5 - 5
store/keyword_test.go

@@ -5,30 +5,30 @@ import (
 )
 
 func TestKeywords(t *testing.T) {
-	keyword := CheckKeyword("machines")
+	keyword := CheckKeyword("_etcd")
 	if !keyword {
 		t.Fatal("machines should be keyword")
 	}
 
-	keyword = CheckKeyword("/machines")
+	keyword = CheckKeyword("/_etcd")
 
 	if !keyword {
 		t.Fatal("/machines should be keyword")
 	}
 
-	keyword = CheckKeyword("/machines/")
+	keyword = CheckKeyword("/_etcd/")
 
 	if !keyword {
 		t.Fatal("/machines/ contains keyword prefix")
 	}
 
-	keyword = CheckKeyword("/machines/node1")
+	keyword = CheckKeyword("/_etcd/node1")
 
 	if !keyword {
 		t.Fatal("/machines/* contains keyword prefix")
 	}
 
-	keyword = CheckKeyword("/nokeyword/machines/node1")
+	keyword = CheckKeyword("/nokeyword/_etcd/node1")
 
 	if keyword {
 		t.Fatal("this does not contain keyword prefix")

+ 9 - 4
store/store.go

@@ -29,7 +29,7 @@ type Store struct {
 	messager *chan string
 
 	// A map to keep the recent response to the clients
-	ResponseMap map[string]Response
+	ResponseMap map[string]*Response
 
 	// The max number of the recent responses we can record
 	ResponseMaxSize int
@@ -109,7 +109,7 @@ func CreateStore(max int) *Store {
 
 	s.messager = nil
 
-	s.ResponseMap = make(map[string]Response)
+	s.ResponseMap = make(map[string]*Response)
 	s.ResponseStartIndex = 0
 	s.ResponseMaxSize = max
 	s.ResponseCurrSize = 0
@@ -126,7 +126,7 @@ func CreateStore(max int) *Store {
 		},
 	}
 
-	s.watcher = createWatcherHub()
+	s.watcher = newWatcherHub()
 
 	return s
 }
@@ -502,7 +502,7 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) {
 	}
 
 	strIndex := strconv.FormatUint(index, 10)
-	s.ResponseMap[strIndex] = *resp
+	s.ResponseMap[strIndex] = resp
 
 	// unlimited
 	if s.ResponseMaxSize < 0 {
@@ -532,6 +532,11 @@ func (s *Store) Save() ([]byte, error) {
 
 // Recovery the state of the stroage system from a previous state
 func (s *Store) Recovery(state []byte) error {
+
+	// we need to stop all the current watchers
+	// recovery will clear watcherHub
+	s.watcher.stopWatchers()
+
 	err := json.Unmarshal(state, s)
 
 	// The only thing need to change after the recovery is the

+ 5 - 0
store/tree.go

@@ -57,6 +57,11 @@ func (t *tree) set(key string, value Node) bool {
 
 	nodesName := split(key)
 
+	// avoid set value to "/"
+	if len(nodesName) == 1 && len(nodesName[0]) == 0 {
+		return false
+	}
+
 	nodeMap := t.Root.NodeMap
 
 	i := 0

+ 18 - 7
store/watcher.go

@@ -19,24 +19,24 @@ type WatcherHub struct {
 
 // Currently watcher only contains a response channel
 type Watcher struct {
-	C chan Response
+	C chan *Response
 }
 
 // Create a new watcherHub
-func createWatcherHub() *WatcherHub {
+func newWatcherHub() *WatcherHub {
 	w := new(WatcherHub)
 	w.watchers = make(map[string][]*Watcher)
 	return w
 }
 
 // Create a new watcher
-func CreateWatcher() *Watcher {
-	return &Watcher{C: make(chan Response, 1)}
+func NewWatcher() *Watcher {
+	return &Watcher{C: make(chan *Response, 1)}
 }
 
 // Add a watcher to the watcherHub
 func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
-	responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error {
+	responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error {
 
 	prefix = path.Clean("/" + prefix)
 
@@ -65,7 +65,7 @@ func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint
 }
 
 // Check if the response has what we are watching
-func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool {
+func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool {
 
 	resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
 
@@ -104,7 +104,7 @@ func (w *WatcherHub) notify(resp Response) error {
 			newWatchers := make([]*Watcher, 0)
 			// notify all the watchers
 			for _, watcher := range watchers {
-				watcher.C <- resp
+				watcher.C <- &resp
 			}
 
 			if len(newWatchers) == 0 {
@@ -120,3 +120,14 @@ func (w *WatcherHub) notify(resp Response) error {
 
 	return nil
 }
+
+// stopWatchers stops all the watchers
+// This function is used when the etcd recovery from a snapshot at runtime
+func (w *WatcherHub) stopWatchers() {
+	for _, subWatchers := range w.watchers {
+		for _, watcher := range subWatchers {
+			watcher.C <- nil
+		}
+	}
+	w.watchers = nil
+}

+ 0 - 29
store/watcher_test.bak

@@ -1,29 +0,0 @@
-package store
-
-import (
-	"fmt"
-	"testing"
-	"time"
-)
-
-func TestWatch(t *testing.T) {
-	// watcher := createWatcher()
-	c := make(chan Response)
-	d := make(chan Response)
-	w.add("/", c)
-	go say(c)
-	w.add("/prefix/", d)
-	go say(d)
-	s.Set("/prefix/foo", "bar", time.Unix(0, 0))
-}
-
-func say(c chan Response) {
-	result := <-c
-
-	if result.Action != -1 {
-		fmt.Println("yes")
-	} else {
-		fmt.Println("no")
-	}
-
-}

+ 95 - 0
store/watcher_test.go

@@ -0,0 +1,95 @@
+package store
+
+import (
+	"math/rand"
+	"strconv"
+	"testing"
+	"time"
+)
+
+func TestWatch(t *testing.T) {
+
+	s := CreateStore(100)
+
+	watchers := make([]*Watcher, 10)
+
+	for i, _ := range watchers {
+
+		// create a new watcher
+		watchers[i] = NewWatcher()
+		// add to the watchers list
+		s.AddWatcher("foo", watchers[i], 0)
+
+	}
+
+	s.Set("/foo/foo", "bar", time.Unix(0, 0), 1)
+
+	for _, watcher := range watchers {
+
+		// wait for the notification for any changing
+		res := <-watcher.C
+
+		if res == nil {
+			t.Fatal("watcher is cleared")
+		}
+	}
+
+	for i, _ := range watchers {
+
+		// create a new watcher
+		watchers[i] = NewWatcher()
+		// add to the watchers list
+		s.AddWatcher("foo/foo/foo", watchers[i], 0)
+
+	}
+
+	s.watcher.stopWatchers()
+
+	for _, watcher := range watchers {
+
+		// wait for the notification for any changing
+		res := <-watcher.C
+
+		if res != nil {
+			t.Fatal("watcher is cleared")
+		}
+	}
+}
+
+// BenchmarkWatch creates 10K watchers watch at /foo/[paht] each time.
+// Path is randomly chosen with max depth 10.
+// It should take less than 15ms to wake up 10K watchers.
+func BenchmarkWatch(b *testing.B) {
+	s := CreateStore(100)
+
+	key := make([]string, 10000)
+	for i := 0; i < 10000; i++ {
+
+		key[i] = "/foo/"
+		depth := rand.Intn(10)
+
+		for j := 0; j < depth; j++ {
+			key[i] += "/" + strconv.Itoa(rand.Int()%10)
+		}
+	}
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		watchers := make([]*Watcher, 10000)
+		for i := 0; i < 10000; i++ {
+			// create a new watcher
+			watchers[i] = NewWatcher()
+			// add to the watchers list
+			s.AddWatcher(key[i], watchers[i], 0)
+		}
+
+		s.watcher.stopWatchers()
+
+		for _, watcher := range watchers {
+			// wait for the notification for any changing
+			<-watcher.C
+		}
+
+		s.watcher = newWatcherHub()
+	}
+}