Browse Source

Merge pull request #2 from xiangli-cmu/addIndex

Add index
Xiang Li 12 years ago
parent
commit
56244a6d40
3 changed files with 110 additions and 30 deletions
  1. 4 4
      command.go
  2. 68 14
      store/store.go
  3. 38 12
      store/watcher.go

+ 4 - 4
command.go

@@ -33,7 +33,7 @@ func (c *SetCommand) CommandName() string {
 
 
 // Set the value of key to value
 // Set the value of key to value
 func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
 func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
-	return store.Set(c.Key, c.Value, c.ExpireTime)
+	return store.Set(c.Key, c.Value, c.ExpireTime, server.CommittedIndex())
 }
 }
 
 
 // Get the path for http request
 // Get the path for http request
@@ -73,7 +73,7 @@ func (c *DeleteCommand) CommandName() string {
 
 
 // Delete the key
 // Delete the key
 func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
 func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
-	return store.Delete(c.Key)
+	return store.Delete(c.Key, server.CommittedIndex())
 }
 }
 
 
 // Watch command
 // Watch command
@@ -87,10 +87,10 @@ func (c *WatchCommand) CommandName() string {
 }
 }
 
 
 func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
-	ch := make(chan store.Response)
+	ch := make(chan store.Response, 1)
 
 
 	// add to the watchers list
 	// add to the watchers list
-	store.AddWatcher(c.Key, ch, 0)
+	store.AddWatcher(c.Key, ch, 1)
 
 
 	// wait for the notification for any changing
 	// wait for the notification for any changing
 	res := <-ch
 	res := <-ch

+ 68 - 14
store/store.go

@@ -27,6 +27,15 @@ type Store struct {
 	// the string channel to send messages to the outside world
 	// the string channel to send messages to the outside world
 	// now we use it to send changes to the hub of the web service
 	// now we use it to send changes to the hub of the web service
 	messager *chan string
 	messager *chan string
+
+	// previous responses
+	Responses []Response
+
+	// at some point, we may need to compact the Response
+	ResponseStartIndex uint64
+
+	// current Index
+	Index uint64
 }
 }
 
 
 type Node struct {
 type Node struct {
@@ -43,14 +52,19 @@ type Node struct {
 type Response struct {
 type Response struct {
 	Action   int    `json:"action"`
 	Action   int    `json:"action"`
 	Key      string `json:"key"`
 	Key      string `json:"key"`
-	OldValue string `json:"oldValue"`
-	NewValue string `json:"newValue"`
+	PrevValue string `json:"prevValue"`
+	Value string `json:"Value"`
 
 
 	// if the key existed before the action, this field should be true
 	// if the key existed before the action, this field should be true
 	// if the key did not exist before the action, this field should be false
 	// if the key did not exist before the action, this field should be false
 	Exist bool `json:"exist"`
 	Exist bool `json:"exist"`
 
 
 	Expiration time.Time `json:"expiration"`
 	Expiration time.Time `json:"expiration"`
+
+	// countdown until expiration in seconds
+	TTL int64 `json:"TTL"`
+
+	Index uint64 `json:"index"`
 }
 }
 
 
 func init() {
 func init() {
@@ -62,6 +76,8 @@ func init() {
 func createStore() *Store {
 func createStore() *Store {
 	s := new(Store)
 	s := new(Store)
 	s.Nodes = make(map[string]Node)
 	s.Nodes = make(map[string]Node)
+	s.Responses = make([]Response, 0)
+	s.ResponseStartIndex = 0
 	return s
 	return s
 }
 }
 
 
@@ -76,7 +92,12 @@ func (s *Store) SetMessager(messager *chan string) {
 }
 }
 
 
 // set the key to value, return the old value if the key exists
 // set the key to value, return the old value if the key exists
-func Set(key string, value string, expireTime time.Time) ([]byte, error) {
+func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
+
+	//update index
+	s.Index = index
+	
+	key = "/" + key
 
 
 	key = path.Clean(key)
 	key = path.Clean(key)
 
 
@@ -88,7 +109,15 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 	// the key may be expired, we should not add the node
 	// the key may be expired, we should not add the node
 	// also if the node exist, we need to delete the node
 	// also if the node exist, we need to delete the node
 	if isExpire && expireTime.Sub(time.Now()) < 0 {
 	if isExpire && expireTime.Sub(time.Now()) < 0 {
-		return Delete(key)
+		return Delete(key, index)
+	}
+
+	var TTL int64
+	// update ttl
+	if isExpire {
+		TTL = int64(expireTime.Sub(time.Now()) / time.Second)
+	} else {
+		TTL = -1
 	}
 	}
 
 
 	// get the node
 	// get the node
@@ -108,12 +137,13 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 				node.update = make(chan time.Time)
 				node.update = make(chan time.Time)
 				go expire(key, node.update, expireTime)
 				go expire(key, node.update, expireTime)
 			}
 			}
+			
 		}
 		}
 
 
 		// update the information of the node
 		// update the information of the node
 		s.Nodes[key] = Node{value, expireTime, node.update}
 		s.Nodes[key] = Node{value, expireTime, node.update}
 
 
-		resp := Response{SET, key, node.Value, value, true, expireTime}
+		resp := Response{SET, key, node.Value, value, true, expireTime, TTL, index}
 
 
 		msg, err := json.Marshal(resp)
 		msg, err := json.Marshal(resp)
 
 
@@ -125,9 +155,11 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
 
 
+		s.Responses = append(s.Responses, resp)
+
 		return msg, err
 		return msg, err
 
 
-		// add new node
+	// add new node
 	} else {
 	} else {
 
 
 		update := make(chan time.Time)
 		update := make(chan time.Time)
@@ -138,7 +170,7 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 			go expire(key, update, expireTime)
 			go expire(key, update, expireTime)
 		}
 		}
 
 
-		resp := Response{SET, key, "", value, false, expireTime}
+		resp := Response{SET, key, "", value, false, expireTime, TTL, index}
 
 
 		msg, err := json.Marshal(resp)
 		msg, err := json.Marshal(resp)
 
 
@@ -150,7 +182,9 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 
 
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
-
+		
+		s.Responses = append(s.Responses, resp)
+		fmt.Println(len(s.Responses), " ")
 		return msg, err
 		return msg, err
 	}
 	}
 }
 }
@@ -170,7 +204,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
 
 
 				delete(s.Nodes, key)
 				delete(s.Nodes, key)
 
 
-				resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
+				resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
 
 
 				msg, err := json.Marshal(resp)
 				msg, err := json.Marshal(resp)
 
 
@@ -207,14 +241,30 @@ func Get(key string) Response {
 	node, ok := s.Nodes[key]
 	node, ok := s.Nodes[key]
 
 
 	if ok {
 	if ok {
-		return Response{GET, key, node.Value, node.Value, true, node.ExpireTime}
+		var TTL int64
+		var isExpire bool = false
+
+		isExpire = !node.ExpireTime.Equal(PERMANENT)
+
+		// update ttl
+		if isExpire {
+			TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
+		} else {
+			TTL = -1
+		}
+
+		return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
 	} else {
 	} else {
-		return Response{GET, key, "", "", false, time.Unix(0, 0)}
+
+		return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index}
 	}
 	}
 }
 }
 
 
 // delete the key
 // delete the key
-func Delete(key string) ([]byte, error) {
+func Delete(key string, index uint64) ([]byte, error) {
+	//update index
+	s.Index = index
+
 	key = path.Clean(key)
 	key = path.Clean(key)
 
 
 	node, ok := s.Nodes[key]
 	node, ok := s.Nodes[key]
@@ -233,7 +283,7 @@ func Delete(key string) ([]byte, error) {
 
 
 		}
 		}
 
 
-		resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
+		resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, index}
 
 
 		msg, err := json.Marshal(resp)
 		msg, err := json.Marshal(resp)
 
 
@@ -245,11 +295,15 @@ func Delete(key string) ([]byte, error) {
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
 
 
+		s.Responses = append(s.Responses, resp)
 		return msg, err
 		return msg, err
 
 
 	} else {
 	} else {
 
 
-		return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)})
+		resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}
+		s.Responses = append(s.Responses, resp)
+
+		return json.Marshal(resp)
 	}
 	}
 }
 }
 
 

+ 38 - 12
store/watcher.go

@@ -3,11 +3,7 @@ package store
 import (
 import (
 	"path"
 	"path"
 	"strings"
 	"strings"
-)
-
-const (
-	SHORT = iota
-	LONG
+	"fmt"
 )
 )
 
 
 type WatcherHub struct {
 type WatcherHub struct {
@@ -16,7 +12,6 @@ type WatcherHub struct {
 
 
 type Watcher struct {
 type Watcher struct {
 	c     chan Response
 	c     chan Response
-	wType int
 }
 }
 
 
 // global watcher
 // global watcher
@@ -39,22 +34,32 @@ func GetWatcherHub() *WatcherHub {
 }
 }
 
 
 // register a function with channel and prefix to the watcher
 // register a function with channel and prefix to the watcher
-func AddWatcher(prefix string, c chan Response, wType int) error {
+func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
 
 
 	prefix = "/" + path.Clean(prefix)
 	prefix = "/" + path.Clean(prefix)
 
 
+	if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
+
+		for i := sinceIndex; i < s.Index; i++ {
+			if check(prefix, i) {
+				c <- s.Responses[i]
+				return nil
+			}
+		}
+	}
+
 	_, ok := w.watchers[prefix]
 	_, ok := w.watchers[prefix]
 
 
 	if !ok {
 	if !ok {
 
 
 		w.watchers[prefix] = make([]Watcher, 0)
 		w.watchers[prefix] = make([]Watcher, 0)
 
 
-		watcher := Watcher{c, wType}
+		watcher := Watcher{c}
 
 
 		w.watchers[prefix] = append(w.watchers[prefix], watcher)
 		w.watchers[prefix] = append(w.watchers[prefix], watcher)
 	} else {
 	} else {
 
 
-		watcher := Watcher{c, wType}
+		watcher := Watcher{c}
 
 
 		w.watchers[prefix] = append(w.watchers[prefix], watcher)
 		w.watchers[prefix] = append(w.watchers[prefix], watcher)
 	}
 	}
@@ -62,6 +67,30 @@ func AddWatcher(prefix string, c chan Response, wType int) error {
 	return nil
 	return nil
 }
 }
 
 
+// check if the response has what we are waching
+func check(prefix string, index uint64) bool {
+
+	index = index - s.ResponseStartIndex
+
+	if index < 0 {
+		return false
+	}
+
+	path := s.Responses[index].Key
+	fmt.Println("checking ", path, " ", prefix)
+	if strings.HasPrefix(path, prefix) {
+		fmt.Println("checking found")
+		prefixLen := len(prefix)
+		if len(path) == prefixLen || path[prefixLen] == '/' {
+			return true
+		}
+
+	}
+
+	return false
+}
+
+
 // notify the watcher a action happened
 // notify the watcher a action happened
 func notify(resp Response) error {
 func notify(resp Response) error {
 	resp.Key = path.Clean(resp.Key)
 	resp.Key = path.Clean(resp.Key)
@@ -81,9 +110,6 @@ func notify(resp Response) error {
 			// notify all the watchers
 			// notify all the watchers
 			for _, watcher := range watchers {
 			for _, watcher := range watchers {
 				watcher.c <- resp
 				watcher.c <- resp
-				if watcher.wType == LONG {
-					newWatchers = append(newWatchers, watcher)
-				}
 			}
 			}
 
 
 			if len(newWatchers) == 0 {
 			if len(newWatchers) == 0 {