Browse Source

watch accept sinceIndex

Xiang Li 12 years ago
parent
commit
e211554b91
4 changed files with 31 additions and 16 deletions
  1. 3 2
      command.go
  2. 4 2
      etcd.go
  3. 19 2
      handlers.go
  4. 5 10
      store/store.go

+ 3 - 2
command.go

@@ -78,7 +78,8 @@ func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
 
 // Watch command
 type WatchCommand struct {
-	Key string `json:"key"`
+	Key        string `json:"key"`
+	SinceIndex uint64 `json:"sinceIndex"`
 }
 
 //The name of the command in the log
@@ -90,7 +91,7 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 	ch := make(chan store.Response, 1)
 
 	// add to the watchers list
-	store.AddWatcher(c.Key, ch, 1)
+	store.AddWatcher(c.Key, ch, c.SinceIndex)
 
 	// wait for the notification for any changing
 	res := <-ch

+ 4 - 2
etcd.go

@@ -48,7 +48,7 @@ var dirPath string
 
 var ignore bool
 
-var responseBufferSize int
+var maxSize int
 
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
@@ -71,6 +71,8 @@ func init() {
 	flag.StringVar(&dirPath, "d", "./", "the directory to store log and snapshot")
 
 	flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
+
+	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 }
 
 // CONSTANTS
@@ -157,7 +159,7 @@ func main() {
 	serverTransHandler = createTranHandler(st)
 
 	// Setup new raft server.
-	s := store.GetStore()
+	s := store.CreateStore(maxSize)
 
 	// create raft server
 	server, err = raft.NewServer(name, dirPath, serverTransHandler, s, nil)

+ 19 - 2
handlers.go

@@ -220,11 +220,28 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
 func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/watch/"):]
 
-	debug("[recv] GET http://%v/watch/%s", server.Name(), key)
-
 	command := &WatchCommand{}
 	command.Key = key
 
+	if req.Method == "GET" {
+		debug("[recv] GET http://%v/watch/%s", server.Name(), key)
+		command.SinceIndex = 0
+
+	} else if req.Method == "POST" {
+		debug("[recv] POST http://%v/watch/%s", server.Name(), key)
+		content, err := ioutil.ReadAll(req.Body)
+
+		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
+		if err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+		}
+		command.SinceIndex = sinceIndex
+
+	} else {
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return
+	}
+
 	if body, err := command.Apply(server); err != nil {
 		warn("raftd: Unable to write file: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)

+ 5 - 10
store/store.go

@@ -73,20 +73,15 @@ type Response struct {
 	Index uint64 `json:"index"`
 }
 
-func init() {
-	s = createStore()
-	s.messager = nil
-	s.ResponseStartIndex = 0
-	s.ResponseMaxSize = 1024
-	s.ResponseCurrSize = 0
-}
-
 // make a new stroe
-func createStore() *Store {
-	s := new(Store)
+func CreateStore(max int) *Store {
+	s = new(Store)
+	s.messager = nil
 	s.Nodes = make(map[string]Node)
 	s.ResponseMap = make(map[string]Response)
 	s.ResponseStartIndex = 0
+	s.ResponseMaxSize = max
+	s.ResponseCurrSize = 0
 	return s
 }