Browse Source

add watch function and distinguish sensetive and non-sentive command

Xiang Li 12 years ago
parent
commit
2e679d257c
6 changed files with 121 additions and 45 deletions
  1. 59 6
      command.go
  2. 33 7
      handlers.go
  3. 1 0
      raftd.go
  4. 16 14
      store.go
  5. 8 16
      watcher.go
  6. 4 2
      watcher_test.go

+ 59 - 6
command.go

@@ -19,6 +19,7 @@ type Command interface {
 	Type() string
 	GetValue() string
 	GetKey() string
+	Sensitive() bool
 }
 
 // Set command
@@ -38,22 +39,26 @@ func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
 	return json.Marshal(res)
 }
 
-func (c *SetCommand) GeneratePath() string{
+func (c *SetCommand) GeneratePath() string {
 	return "/set/" + c.Key
 }
 
-func (c *SetCommand) Type() string{
+func (c *SetCommand) Type() string {
 	return "POST"
 }
 
-func (c *SetCommand) GetValue() string{
+func (c *SetCommand) GetValue() string {
 	return c.Value
 }
 
-func (c *SetCommand) GetKey() string{
+func (c *SetCommand) GetKey() string {
 	return c.Key
 }
 
+func (c *SetCommand) Sensitive() bool {
+	return true
+}
+
 
 // Get command
 type GetCommand struct {
@@ -87,6 +92,9 @@ func (c *GetCommand) GetKey() string{
 	return c.Key
 }
 
+func (c *GetCommand) Sensitive() bool {
+	return false
+}
 
 // Delete command
 type DeleteCommand struct {
@@ -98,7 +106,7 @@ func (c *DeleteCommand) CommandName() string {
 	return "delete"
 }
 
-// Set the value of key to value
+// Delete the key 
 func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
 	res := s.Delete(c.Key)
 	return json.Marshal(res)
@@ -120,7 +128,52 @@ func (c *DeleteCommand) GetKey() string{
 	return c.Key
 }
 
-// joinCommand
+func (c *DeleteCommand) Sensitive() bool {
+	return true
+}
+
+
+// Watch command
+type WatchCommand struct {
+	Key string `json:"key"`
+}
+
+//The name of the command in the log
+func (c *WatchCommand) CommandName() string {
+	return "watch"
+}
+
+func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
+	ch := make(chan Response)
+
+	w.add(c.Key, ch)	
+
+	res := <- ch
+
+	return json.Marshal(res)
+}
+
+func (c *WatchCommand) GeneratePath() string{
+	return "/watch/" + c.Key
+}
+
+func (c *WatchCommand) Type() string{
+	return "GET"
+}
+
+func (c *WatchCommand) GetValue() string{
+	return ""
+}
+
+func (c *WatchCommand) GetKey() string{
+	return c.Key
+}
+
+func (c *WatchCommand) Sensitive() bool {
+	return false
+}
+
+// JoinCommand
 type JoinCommand struct {
 	Name string `json:"name"`
 }

+ 33 - 7
handlers.go

@@ -132,6 +132,18 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 
 
+func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
+	vars := mux.Vars(req)
+
+	debug("[recv] GET http://%v/watch/%s", server.Name(), vars["key"])
+
+	command := &WatchCommand{}
+	command.Key = vars["key"]
+
+	Dispatch(server, command, w)
+
+}
+
 func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 	var body []byte
 	var err error
@@ -142,15 +154,29 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 	for {
 		// i am the leader, i will take care of the command
 		if server.State() == "leader" {
-			fmt.Println("i am leader ", server.Name())
-			if body, err = server.Do(command); err != nil {
-				warn("raftd: Unable to write file: %v", err)
-				w.WriteHeader(http.StatusInternalServerError)
+			if command.Sensitive() {
+				if body, err = server.Do(command); err != nil {
+					warn("raftd: Unable to write file: %v", err)
+					w.WriteHeader(http.StatusInternalServerError)
+					return
+				} else {
+				// good to go
+					w.WriteHeader(http.StatusOK)
+					w.Write(body)
+					return
+				}
 			} else {
+				fmt.Println("non-sensitive")
+				if body, err = command.Apply(server); err != nil {
+					warn("raftd: Unable to write file: %v", err)
+					w.WriteHeader(http.StatusInternalServerError)
+					return
+				} else {
 				// good to go
-				w.WriteHeader(http.StatusOK)
-				w.Write(body)
-				return
+					w.WriteHeader(http.StatusOK)
+					w.Write(body)
+					return
+				}
 			}
 
 		// redirect the command to the current leader

+ 1 - 0
raftd.go

@@ -138,6 +138,7 @@ func main() {
     r.HandleFunc("/set/{key}", SetHttpHandler).Methods("POST")
     r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET")
     r.HandleFunc("/delete/{key}", DeleteHttpHandler).Methods("GET")
+    r.HandleFunc("/watch/{key}", WatchHttpHandler).Methods("GET")
 
     http.Handle("/", r)
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))

+ 16 - 14
store.go

@@ -3,13 +3,14 @@ package main
 import (
 	"path"
 	"encoding/json"
-	"fmt"
+	//"fmt"
 	)
 
 // CONSTANTS
 const (
-	ERROR = -(1 + iota)
+	ERROR = -1 + iota
 	SET 
+	GET
 	DELETE
 )
 
@@ -18,7 +19,10 @@ type Store struct {
 }
 
 type Response struct {
-	OldValue string `json:oldvalue`
+	Action	 int    `json:action`
+	Key      string `json:key`
+	OldValue string `json:oldValue`
+	NewValue string `json:newValue`
 	Exist 	 bool `json:exist`
 }
 
@@ -39,34 +43,32 @@ func createStore() *Store{
 
 // set the key to value, return the old value if the key exists 
 func (s *Store) Set(key string, value string) Response {
-	fmt.Println("Store SET")
 	key = path.Clean(key)
 
 	oldValue, ok := s.Nodes[key]
 
 	if ok {
 		s.Nodes[key] = value
-		w.notify(SET, key, oldValue, value)
-		return Response{oldValue, true}
+		w.notify(SET, key, oldValue, value, true)
+		return Response{SET, key, oldValue, value, true}
 
 	} else {
 		s.Nodes[key] = value
-		w.notify(SET, key, "", value)
-		return Response{"", false}
+		w.notify(SET, key, "", value, false)
+		return Response{SET, key, "", value, false}
 	}
 }
 
 // get the value of the key
 func (s *Store) Get(key string) Response {
-	fmt.Println("Stroe Get")
 	key = path.Clean(key)
 
 	value, ok := s.Nodes[key]
 
 	if ok {
-		return Response{value, true}
+		return Response{GET, key, value, value, true}
 	} else {
-		return Response{"", false}
+		return Response{GET, key, "", value, false}
 	}
 }
 
@@ -79,11 +81,11 @@ func (s *Store) Delete(key string) Response {
 	if ok {
 		delete(s.Nodes, key)
 
-		w.notify(DELETE, key, oldValue, "")
+		w.notify(DELETE, key, oldValue, "", true)
 
-		return Response{oldValue, true}
+		return Response{DELETE, key, oldValue, "", true}
 	} else {
-		return Response{"", false}
+		return Response{DELETE, key, "", "", false}
 	}
 }
 

+ 8 - 16
watcher.go

@@ -8,14 +8,7 @@ import (
 
 
 type Watcher struct {
-	chanMap map[string][]chan Notification
-}
-
-type Notification struct {
-	action int 
-	key	string
-	oldValue string
-	newValue string
+	chanMap map[string][]chan Response
 }
 
 // global watcher
@@ -29,19 +22,19 @@ func init() {
 // create a new watcher
 func createWatcher() *Watcher {
 	w := new(Watcher)
-	w.chanMap = make(map[string][]chan Notification)
+	w.chanMap = make(map[string][]chan Response)
 	return w
 }
 
 // register a function with channel and prefix to the watcher
-func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notification)) error {
+func (w *Watcher) add(prefix string, c chan Response) error {
 
-	prefix = path.Clean(prefix)
+	prefix = "/" + path.Clean(prefix)
 	fmt.Println("Add ", prefix)
 
 	_, ok := w.chanMap[prefix]
 	if !ok {
-		w.chanMap[prefix] = make([]chan Notification, 0)
+		w.chanMap[prefix] = make([]chan Response, 0)
 		w.chanMap[prefix] = append(w.chanMap[prefix], c)
 	} else {
 		w.chanMap[prefix] = append(w.chanMap[prefix], c)
@@ -49,14 +42,13 @@ func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notificati
 
 	fmt.Println(len(w.chanMap[prefix]), "@", prefix)
 
-	go f(c)
 	return nil
 }
 
 // notify the watcher a action happened
-func (w *Watcher) notify(action int, key string, oldValue string, newValue string) error {
+func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error {
 	key = path.Clean(key)
-
+	fmt.Println("notify")
 	segments := strings.Split(key, "/")
 
 	currPath := "/"
@@ -73,7 +65,7 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin
 		if ok {
 			fmt.Println("found ", currPath)
 
-			n := Notification {action, key, oldValue, newValue}
+			n := Response {action, key, oldValue, newValue, exist}
 			// notify all the watchers
 			for _, c := range chans {
 				c <- n

+ 4 - 2
watcher_test.go

@@ -9,8 +9,10 @@ func TestWatch(t *testing.T) {
 	// watcher := createWatcher()
 	c := make(chan Notification)
 	d := make(chan Notification)
-	w.add("/", c, say)
-	w.add("/prefix/", d, say)
+	w.add("/", c)
+	go say(c)
+	w.add("/prefix/", d)
+	go say(d)
 	s.Set("/prefix/foo", "bar")
 }