Xiang Li 12 роки тому
батько
коміт
1caf2a3364
19 змінених файлів з 54 додано та 1890 видалено
  1. 2 19
      command.go
  2. 0 3
      etcd.go
  3. 20 25
      etcd_handlers.go
  4. 6 6
      file_system/file_system.go
  5. 6 2
      machines.go
  6. 7 3
      name_url_map.go
  7. 1 2
      raft_server.go
  8. 5 5
      snapshot.go
  9. 0 37
      store/keyword_test.go
  10. 0 33
      store/keywords.go
  11. 0 33
      store/stats.go
  12. 0 663
      store/store.go
  13. 0 258
      store/store_test.go
  14. 0 21
      store/test.go
  15. 0 318
      store/tree.go
  16. 0 247
      store/tree_store_test.go
  17. 0 129
      store/watcher.go
  18. 0 84
      store/watcher_test.go
  19. 7 2
      util.go

+ 2 - 19
command.go

@@ -73,23 +73,6 @@ func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
 	return json.Marshal(e)
 }
 
-// Set command
-type SetCommand struct {
-	Key        string    `json:"key"`
-	Value      string    `json:"value"`
-	ExpireTime time.Time `json:"expireTime"`
-}
-
-// The name of the set command in the log
-func (c *SetCommand) CommandName() string {
-	return commandName("set")
-}
-
-// Set the key-value pair
-func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
-	return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
-}
-
 // TestAndSet command
 type TestAndSetCommand struct {
 	Key        string    `json:"key"`
@@ -240,7 +223,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
 	etcdFs.Create(key, value, fileSystem.Permanent, raftServer.CommitIndex(), raftServer.Term())
 
-	if c.Name != r.Name() {
+	if c.Name != r.Name() { // do not add self to the peer list
 		r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
 	}
 
@@ -267,7 +250,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	// remove machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)
 
-	_, err := etcdStore.Delete(key, raftServer.CommitIndex())
+	_, err := etcdFs.Delete(key, false, raftServer.CommitIndex(), raftServer.Term())
 	delete(r.peersStats, c.Name)
 
 	if err != nil {

+ 0 - 3
etcd.go

@@ -11,7 +11,6 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/file_system"
-	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 )
 
@@ -137,7 +136,6 @@ type TLSConfig struct {
 //
 //------------------------------------------------------------------------------
 
-var etcdStore *store.Store
 var etcdFs *fileSystem.FileSystem
 
 //------------------------------------------------------------------------------
@@ -206,7 +204,6 @@ func main() {
 	info := getInfo(dirPath)
 
 	// Create etcd key-value store
-	etcdStore = store.CreateStore(maxSize)
 	etcdFs = fileSystem.New()
 
 	snapConf = newSnapshotConf()

+ 20 - 25
etcd_handlers.go

@@ -7,6 +7,7 @@ import (
 	"strings"
 
 	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/file_system"
 	"github.com/coreos/go-raft"
 )
 
@@ -83,17 +84,13 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error {
 //--------------------------------------
 
 func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v2/keys"):]
+	key := getNodePath(req.URL.Path)
 
 	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
-	req.ParseForm()
-
-	value := req.Form.Get("value")
-
-	ttl := req.FormValue("ttl")
+	value := req.FormValue("value")
 
-	expireTime, err := durationToExpireTime(ttl)
+	expireTime, err := durationToExpireTime(req.FormValue("ttl"))
 
 	if err != nil {
 		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create")
@@ -110,22 +107,20 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 }
 
 func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v2/keys"):]
+	key := getNodePath(req.URL.Path)
 
 	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 	value := req.FormValue("value")
 
-	ttl := req.FormValue("ttl")
-
-	expireTime, err := durationToExpireTime(ttl)
+	expireTime, err := durationToExpireTime(req.FormValue("ttl"))
 
 	if err != nil {
 		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update")
 	}
 
 	// TODO: update should give at least one option
-	if value == "" && ttl == "" {
+	if value == "" && expireTime.Sub(fileSystem.Permanent) == 0 {
 		return nil
 	}
 
@@ -168,7 +163,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 // Delete Handler
 func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v2/keys"):]
+	key := getNodePath(req.URL.Path)
 
 	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
@@ -228,7 +223,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 // Handler to return the basic stats of etcd
 func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	w.WriteHeader(http.StatusOK)
-	w.Write(etcdStore.Stats())
+	//w.Write(etcdStore.Stats())
 	w.Write(r.Stats())
 	return nil
 }
@@ -236,10 +231,18 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	var err error
 	var event interface{}
-	key := req.URL.Path[len("/v1/keys"):]
-
 	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
+	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
+		// help client to redirect the request to the current leader
+		leader := r.Leader()
+		url, _ := nameToEtcdURL(leader)
+		redirect(url, w, req)
+		return nil
+	}
+
+	key := getNodePath(req.URL.Path)
+
 	recursive := req.FormValue("recursive")
 
 	if req.FormValue("wait") == "true" { // watch
@@ -267,15 +270,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 	} else { //get
 
-		if req.FormValue("consistent") == "true" {
-			if r.State() != raft.Leader {
-				leader := r.Leader()
-				url, _ := nameToEtcdURL(leader)
-				redirect(url, w, req)
-				return nil
-			}
-		}
-
 		command := &GetCommand{
 			Key: key,
 		}
@@ -295,6 +289,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 	if err != nil {
 		return err
+
 	} else {
 		event, _ := event.([]byte)
 		w.WriteHeader(http.StatusOK)

+ 6 - 6
file_system/file_system.go

@@ -332,30 +332,30 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
 // Save function will not be able to save the state of watchers.
 // Save function will not save the parent field of the node. Or there will
 // be cyclic dependencies issue for the json package.
-func (fs *FileSystem) Save() []byte {
+func (fs *FileSystem) Save() ([]byte, error) {
 	cloneFs := New()
 	cloneFs.Root = fs.Root.Clone()
 
 	b, err := json.Marshal(fs)
 
 	if err != nil {
-		panic(err)
+		return nil, err
 	}
 
-	return b
+	return b, nil
 }
 
 // recovery function recovery the store system from a static state.
 // It needs to recovery the parent field of the nodes.
 // It needs to delete the expired nodes since the saved time and also
 // need to create monitor go routines.
-func (fs *FileSystem) Recover(state []byte) {
+func (fs *FileSystem) Recovery(state []byte) error {
 	err := json.Unmarshal(state, fs)
 
 	if err != nil {
-		panic(err)
+		return err
 	}
 
 	fs.Root.recoverAndclean()
-
+	return nil
 }

+ 6 - 2
machines.go

@@ -2,9 +2,13 @@ package main
 
 // machineNum returns the number of machines in the cluster
 func machineNum() int {
-	response, _ := etcdStore.RawGet("_etcd/machines")
+	e, err := etcdFs.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term())
 
-	return len(response)
+	if err != nil {
+		return 0
+	}
+
+	return len(e.KVPairs)
 }
 
 // getMachines gets the current machines in the cluster

+ 7 - 3
name_url_map.go

@@ -49,16 +49,20 @@ func addNameToURL(name string, version string, raftURL string, etcdURL string) {
 }
 
 func readURL(nodeName string, urlName string) (string, bool) {
-	// if fails, try to recover from etcd storage
+	if nodeName == "" {
+		return "", false
+	}
+
+	// convert nodeName to url from etcd storage
 	key := path.Join("/_etcd/machines", nodeName)
 
-	resps, err := etcdStore.RawGet(key)
+	e, err := etcdFs.Get(key, false, false, r.CommitIndex(), r.Term())
 
 	if err != nil {
 		return "", false
 	}
 
-	m, err := url.ParseQuery(resps[0].Value)
+	m, err := url.ParseQuery(e.Value)
 
 	if err != nil {
 		panic("Failed to parse machines entry")

+ 1 - 2
raft_server.go

@@ -36,7 +36,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout)
 
 	// Create raft server
-	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
+	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdFs, nil)
 
 	check(err)
 
@@ -312,7 +312,6 @@ func (r *raftServer) Stats() []byte {
 func registerCommands() {
 	raft.RegisterCommand(&JoinCommand{})
 	raft.RegisterCommand(&RemoveCommand{})
-	raft.RegisterCommand(&SetCommand{})
 	raft.RegisterCommand(&GetCommand{})
 	raft.RegisterCommand(&DeleteCommand{})
 	raft.RegisterCommand(&WatchCommand{})

+ 5 - 5
snapshot.go

@@ -20,17 +20,17 @@ var snapConf *snapshotConf
 
 func newSnapshotConf() *snapshotConf {
 	// check snapshot every 3 seconds and the threshold is 20K
-	return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000}
+	return &snapshotConf{time.Second * 3, 0, 20 * 1000}
 }
 
 func monitorSnapshot() {
 	for {
 		time.Sleep(snapConf.checkingInterval)
-		currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
-
-		if currentWrites > snapConf.writesThr {
+		//currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
+		currentWrites := 0
+		if uint64(currentWrites) > snapConf.writesThr {
 			r.TakeSnapshot()
-			snapConf.lastWrites = etcdStore.TotalWrites()
+			snapConf.lastWrites = 0
 		}
 	}
 }

+ 0 - 37
store/keyword_test.go

@@ -1,37 +0,0 @@
-package store
-
-import (
-	"testing"
-)
-
-func TestKeywords(t *testing.T) {
-	keyword := CheckKeyword("_etcd")
-	if !keyword {
-		t.Fatal("_etcd should be keyword")
-	}
-
-	keyword = CheckKeyword("/_etcd")
-
-	if !keyword {
-		t.Fatal("/_etcd should be keyword")
-	}
-
-	keyword = CheckKeyword("/_etcd/")
-
-	if !keyword {
-		t.Fatal("/_etcd/ contains keyword prefix")
-	}
-
-	keyword = CheckKeyword("/_etcd/node1")
-
-	if !keyword {
-		t.Fatal("/_etcd/* contains keyword prefix")
-	}
-
-	keyword = CheckKeyword("/nokeyword/_etcd/node1")
-
-	if keyword {
-		t.Fatal("this does not contain keyword prefix")
-	}
-
-}

+ 0 - 33
store/keywords.go

@@ -1,33 +0,0 @@
-package store
-
-import (
-	"path"
-	"strings"
-)
-
-// keywords for internal useage
-// Key for string keyword; Value for only checking prefix
-var keywords = map[string]bool{
-	"/_etcd":          true,
-	"/ephemeralNodes": true,
-}
-
-// CheckKeyword will check if the key contains the keyword.
-// For now, we only check for prefix.
-func CheckKeyword(key string) bool {
-	key = path.Clean("/" + key)
-
-	// find the second "/"
-	i := strings.Index(key[1:], "/")
-
-	var prefix string
-
-	if i == -1 {
-		prefix = key
-	} else {
-		prefix = key[:i+1]
-	}
-	_, ok := keywords[prefix]
-
-	return ok
-}

+ 0 - 33
store/stats.go

@@ -1,33 +0,0 @@
-package store
-
-import (
-	"encoding/json"
-)
-
-type EtcdStats struct {
-	// Number of get requests
-	Gets uint64 `json:"gets"`
-
-	// Number of sets requests
-	Sets uint64 `json:"sets"`
-
-	// Number of delete requests
-	Deletes uint64 `json:"deletes"`
-
-	// Number of testAndSet requests
-	TestAndSets uint64 `json:"testAndSets"`
-}
-
-// Stats returns the basic statistics information of etcd storage since its recent start
-func (s *Store) Stats() []byte {
-	b, _ := json.Marshal(s.BasicStats)
-	return b
-}
-
-// TotalWrites returns the total write operations
-// It helps with snapshot
-func (s *Store) TotalWrites() uint64 {
-	bs := s.BasicStats
-
-	return bs.Deletes + bs.Sets + bs.TestAndSets
-}

+ 0 - 663
store/store.go

@@ -1,663 +0,0 @@
-package store
-
-import (
-	"encoding/json"
-	"fmt"
-	"path"
-	"strconv"
-	"sync"
-	"time"
-
-	etcdErr "github.com/coreos/etcd/error"
-)
-
-//------------------------------------------------------------------------------
-//
-// Typedefs
-//
-//------------------------------------------------------------------------------
-
-// The main struct of the Key-Value store
-type Store struct {
-
-	// key-value store structure
-	Tree *tree
-
-	// This mutex protects everything except add watcher member.
-	// Add watch member does not depend on the current state of the store.
-	// And watch will return when other protected function is called and reach
-	// the watching condition.
-	// It is needed so that clone() can atomically replicate the Store
-	// and do the log snapshot in a go routine.
-	mutex sync.RWMutex
-
-	// WatcherHub is where we register all the clients
-	// who issue a watch request
-	watcher *WatcherHub
-
-	// The string channel to send messages to the outside world
-	// Now we use it to send changes to the hub of the web service
-	messager chan<- string
-
-	// A map to keep the recent response to the clients
-	ResponseMap map[string]*Response
-
-	// The max number of the recent responses we can record
-	ResponseMaxSize int
-
-	// The current number of the recent responses we have recorded
-	ResponseCurrSize uint
-
-	// The index of the first recent responses we have
-	ResponseStartIndex uint64
-
-	// Current index of the raft machine
-	Index uint64
-
-	// Basic statistics information of etcd storage
-	BasicStats EtcdStats
-}
-
-// A Node represents a Value in the Key-Value pair in the store
-// It has its value, expire time and a channel used to update the
-// expire time (since we do countdown in a go routine, we need to
-// communicate with it via channel)
-type Node struct {
-	// The string value of the node
-	Value string `json:"value"`
-
-	// If the node is a permanent one the ExprieTime will be Unix(0,0)
-	// Otherwise after the expireTime, the node will be deleted
-	ExpireTime time.Time `json:"expireTime"`
-
-	// A channel to update the expireTime of the node
-	update chan time.Time `json:"-"`
-}
-
-// The response from the store to the user who issue a command
-type Response struct {
-	Action    string `json:"action"`
-	Key       string `json:"key"`
-	Dir       bool   `json:"dir,omitempty"`
-	PrevValue string `json:"prevValue,omitempty"`
-	Value     string `json:"value,omitempty"`
-
-	// If the key did not exist before the action,
-	// this field should be set to true
-	NewKey bool `json:"newKey,omitempty"`
-
-	Expiration *time.Time `json:"expiration,omitempty"`
-
-	// Time to live in second
-	TTL int64 `json:"ttl,omitempty"`
-
-	// The command index of the raft machine when the command is executed
-	Index uint64 `json:"index"`
-}
-
-// A listNode represent the simplest Key-Value pair with its type
-// It is only used when do list opeartion
-// We want to have a file system like store, thus we distingush "file"
-// and "directory"
-type ListNode struct {
-	Key   string
-	Value string
-	Type  string
-}
-
-var PERMANENT = time.Unix(0, 0)
-
-//------------------------------------------------------------------------------
-//
-// Methods
-//
-//------------------------------------------------------------------------------
-
-// Create a new stroe
-// Arguement max is the max number of response we want to record
-func CreateStore(max int) *Store {
-	s := new(Store)
-
-	s.messager = nil
-
-	s.ResponseMap = make(map[string]*Response)
-	s.ResponseStartIndex = 0
-	s.ResponseMaxSize = max
-	s.ResponseCurrSize = 0
-
-	s.Tree = &tree{
-		&treeNode{
-			Node{
-				"/",
-				time.Unix(0, 0),
-				nil,
-			},
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	s.watcher = newWatcherHub()
-
-	return s
-}
-
-// Set the messager of the store
-func (s *Store) SetMessager(messager chan<- string) {
-	s.messager = messager
-}
-
-func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-
-	return s.internalSet(key, value, expireTime, index)
-
-}
-
-// Set the key to value with expiration time
-func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
-	//Update index
-	s.Index = index
-
-	//Update stats
-	s.BasicStats.Sets++
-
-	key = path.Clean("/" + key)
-
-	isExpire := !expireTime.Equal(PERMANENT)
-
-	// base response
-	resp := Response{
-		Action: "SET",
-		Key:    key,
-		Value:  value,
-		Index:  index,
-	}
-
-	// When the slow follower receive the set command
-	// the key may be expired, we should not add the node
-	// also if the node exist, we need to delete the node
-	if isExpire && expireTime.Sub(time.Now()) < 0 {
-		return s.internalDelete(key, index)
-	}
-
-	var TTL int64
-
-	// Update ttl
-	if isExpire {
-		TTL = int64(expireTime.Sub(time.Now()) / time.Second)
-		resp.Expiration = &expireTime
-		resp.TTL = TTL
-	}
-
-	// Get the node
-	node, ok := s.Tree.get(key)
-
-	if ok {
-		// Update when node exists
-
-		// Node is not permanent
-		if !node.ExpireTime.Equal(PERMANENT) {
-
-			// If node is not permanent
-			// Update its expireTime
-			node.update <- expireTime
-
-		} else {
-
-			// If we want the permanent node to have expire time
-			// We need to create a go routine with a channel
-			if isExpire {
-				node.update = make(chan time.Time)
-				go s.monitorExpiration(key, node.update, expireTime)
-			}
-
-		}
-
-		// Update the information of the node
-		s.Tree.set(key, Node{value, expireTime, node.update})
-
-		resp.PrevValue = node.Value
-
-		s.watcher.notify(resp)
-
-		msg, err := json.Marshal(resp)
-
-		// Send to the messager
-		if s.messager != nil && err == nil {
-			s.messager <- string(msg)
-		}
-
-		s.addToResponseMap(index, &resp)
-
-		return msg, err
-
-		// Add new node
-	} else {
-
-		update := make(chan time.Time)
-
-		ok := s.Tree.set(key, Node{value, expireTime, update})
-
-		if !ok {
-			return nil, etcdErr.NewError(102, "set: "+key)
-		}
-
-		if isExpire {
-			go s.monitorExpiration(key, update, expireTime)
-		}
-
-		resp.NewKey = true
-
-		msg, err := json.Marshal(resp)
-
-		// Nofity the watcher
-		s.watcher.notify(resp)
-
-		// Send to the messager
-		if s.messager != nil && err == nil {
-			s.messager <- string(msg)
-		}
-
-		s.addToResponseMap(index, &resp)
-		return msg, err
-	}
-
-}
-
-// Get the value of the key and return the raw response
-func (s *Store) internalGet(key string) *Response {
-
-	key = path.Clean("/" + key)
-
-	node, ok := s.Tree.get(key)
-
-	if ok {
-		var TTL int64
-		var isExpire bool = false
-
-		isExpire = !node.ExpireTime.Equal(PERMANENT)
-
-		resp := &Response{
-			Action: "GET",
-			Key:    key,
-			Value:  node.Value,
-			Index:  s.Index,
-		}
-
-		// Update ttl
-		if isExpire {
-			TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
-			resp.Expiration = &node.ExpireTime
-			resp.TTL = TTL
-		}
-
-		return resp
-
-	} else {
-		// we do not found the key
-		return nil
-	}
-}
-
-// Get all the items under key
-// If key is a file return the file
-// If key is a directory reuturn an array of files
-func (s *Store) Get(key string) ([]byte, error) {
-	s.mutex.RLock()
-	defer s.mutex.RUnlock()
-
-	resps, err := s.RawGet(key)
-
-	if err != nil {
-		return nil, err
-	}
-
-	key = path.Clean("/" + key)
-
-	// If the number of resps == 1 and the response key
-	// is the key we query, a signal key-value should
-	// be returned
-	if len(resps) == 1 && resps[0].Key == key {
-		return json.Marshal(resps[0])
-	}
-
-	return json.Marshal(resps)
-}
-
-func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) {
-	resps := make([]*Response, 1)
-
-	isExpire := !node.ExpireTime.Equal(PERMANENT)
-
-	resps[0] = &Response{
-		Action: "GET",
-		Index:  s.Index,
-		Key:    key,
-		Value:  node.Value,
-	}
-
-	// Update ttl
-	if isExpire {
-		TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
-		resps[0].Expiration = &node.ExpireTime
-		resps[0].TTL = TTL
-	}
-
-	return resps, nil
-}
-
-func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) {
-	resps := make([]*Response, len(nodes))
-
-	// TODO: check if nodes and keys are the same length
-	for i := 0; i < len(nodes); i++ {
-		var TTL int64
-		var isExpire bool = false
-
-		isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
-
-		resps[i] = &Response{
-			Action: "GET",
-			Index:  s.Index,
-			Key:    path.Join(key, keys[i]),
-		}
-
-		if len(nodes[i].Value) != 0 {
-			resps[i].Value = nodes[i].Value
-		} else {
-			resps[i].Dir = true
-		}
-
-		// Update ttl
-		if isExpire {
-			TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
-			resps[i].Expiration = &nodes[i].ExpireTime
-			resps[i].TTL = TTL
-		}
-
-	}
-
-	return resps, nil
-}
-
-func (s *Store) RawGet(key string) ([]*Response, error) {
-	// Update stats
-	s.BasicStats.Gets++
-
-	key = path.Clean("/" + key)
-
-	nodes, keys, ok := s.Tree.list(key)
-	if !ok {
-		return nil, etcdErr.NewError(100, "get: "+key)
-	}
-
-	switch node := nodes.(type) {
-	case *Node:
-		return s.rawGetNode(key, node)
-	case []*Node:
-		return s.rawGetNodeList(key, keys, node)
-	default:
-		panic("invalid cast ")
-	}
-}
-
-func (s *Store) Delete(key string, index uint64) ([]byte, error) {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return s.internalDelete(key, index)
-}
-
-// Delete the key
-func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
-
-	// Update stats
-	s.BasicStats.Deletes++
-
-	key = path.Clean("/" + key)
-
-	// Update index
-	s.Index = index
-
-	node, ok := s.Tree.get(key)
-
-	if !ok {
-		return nil, etcdErr.NewError(100, "delete: "+key)
-	}
-
-	resp := Response{
-		Action:    "DELETE",
-		Key:       key,
-		PrevValue: node.Value,
-		Index:     index,
-	}
-
-	if node.ExpireTime.Equal(PERMANENT) {
-
-		s.Tree.delete(key)
-
-	} else {
-		resp.Expiration = &node.ExpireTime
-		// Kill the expire go routine
-		node.update <- PERMANENT
-		s.Tree.delete(key)
-
-	}
-
-	msg, err := json.Marshal(resp)
-
-	s.watcher.notify(resp)
-
-	// notify the messager
-	if s.messager != nil && err == nil {
-		s.messager <- string(msg)
-	}
-
-	s.addToResponseMap(index, &resp)
-
-	return msg, err
-}
-
-// Set the value of the key to the value if the given prevValue is equal to the value of the key
-func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-
-	// Update stats
-	s.BasicStats.TestAndSets++
-
-	resp := s.internalGet(key)
-
-	if resp == nil {
-		if prevValue != "" {
-			errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue)
-			return nil, etcdErr.NewError(100, errmsg)
-		}
-		return s.internalSet(key, value, expireTime, index)
-	}
-
-	if resp.Value == prevValue {
-
-		// If test succeed, do set
-		return s.internalSet(key, value, expireTime, index)
-	} else {
-
-		// If fails, return err
-		return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
-			resp.Value, prevValue))
-	}
-
-}
-
-// Add a channel to the watchHub.
-// The watchHub will send response to the channel when any key under the prefix
-// changes [since the sinceIndex if given]
-func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
-	return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
-}
-
-// This function should be created as a go routine to delete the key-value pair
-// when it reaches expiration time
-
-func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
-
-	duration := expireTime.Sub(time.Now())
-
-	for {
-		select {
-
-		// Timeout delete the node
-		case <-time.After(duration):
-			node, ok := s.Tree.get(key)
-
-			if !ok {
-				return
-
-			} else {
-				s.mutex.Lock()
-
-				s.Tree.delete(key)
-
-				resp := Response{
-					Action:     "DELETE",
-					Key:        key,
-					PrevValue:  node.Value,
-					Expiration: &node.ExpireTime,
-					Index:      s.Index,
-				}
-				s.mutex.Unlock()
-
-				msg, err := json.Marshal(resp)
-
-				s.watcher.notify(resp)
-
-				// notify the messager
-				if s.messager != nil && err == nil {
-					s.messager <- string(msg)
-				}
-
-				return
-
-			}
-
-		case updateTime := <-update:
-			// Update duration
-			// If the node become a permanent one, the go routine is
-			// not needed
-			if updateTime.Equal(PERMANENT) {
-				return
-			}
-
-			// Update duration
-			duration = updateTime.Sub(time.Now())
-		}
-	}
-}
-
-// When we receive a command that will change the state of the key-value store
-// We will add the result of it to the ResponseMap for the use of watch command
-// Also we may remove the oldest response when we add new one
-func (s *Store) addToResponseMap(index uint64, resp *Response) {
-
-	// zero case
-	if s.ResponseMaxSize == 0 {
-		return
-	}
-
-	strIndex := strconv.FormatUint(index, 10)
-	s.ResponseMap[strIndex] = resp
-
-	// unlimited
-	if s.ResponseMaxSize < 0 {
-		s.ResponseCurrSize++
-		return
-	}
-
-	// if we reach the max point, we need to delete the most latest
-	// response and update the startIndex
-	if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
-		s.ResponseStartIndex++
-		delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
-	} else {
-		s.ResponseCurrSize++
-	}
-}
-
-func (s *Store) clone() *Store {
-	newStore := &Store{
-		ResponseMaxSize:    s.ResponseMaxSize,
-		ResponseCurrSize:   s.ResponseCurrSize,
-		ResponseStartIndex: s.ResponseStartIndex,
-		Index:              s.Index,
-		BasicStats:         s.BasicStats,
-	}
-
-	newStore.Tree = s.Tree.clone()
-	newStore.ResponseMap = make(map[string]*Response)
-
-	for index, response := range s.ResponseMap {
-		newStore.ResponseMap[index] = response
-	}
-
-	return newStore
-}
-
-// Save the current state of the storage system
-func (s *Store) Save() ([]byte, error) {
-	// first we clone the store
-	// json is very slow, we cannot hold the lock for such a long time
-	s.mutex.Lock()
-	cloneStore := s.clone()
-	s.mutex.Unlock()
-
-	b, err := json.Marshal(cloneStore)
-	if err != nil {
-		fmt.Println(err)
-		return nil, err
-	}
-	return b, nil
-}
-
-// Recovery the state of the stroage system from a previous state
-func (s *Store) Recovery(state []byte) error {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	// we need to stop all the current watchers
-	// recovery will clear watcherHub
-	s.watcher.stopWatchers()
-
-	err := json.Unmarshal(state, s)
-
-	// The only thing need to change after the recovery is the
-	// node with expiration time, we need to delete all the node
-	// that have been expired and setup go routines to monitor the
-	// other ones
-	s.checkExpiration()
-
-	return err
-}
-
-// Clean the expired nodes
-// Set up go routines to mon
-func (s *Store) checkExpiration() {
-	s.Tree.traverse(s.checkNode, false)
-}
-
-// Check each node
-func (s *Store) checkNode(key string, node *Node) {
-
-	if node.ExpireTime.Equal(PERMANENT) {
-		return
-	} else {
-		if node.ExpireTime.Sub(time.Now()) >= time.Second {
-
-			node.update = make(chan time.Time)
-			go s.monitorExpiration(key, node.update, node.ExpireTime)
-
-		} else {
-			// we should delete this node
-			s.Tree.delete(key)
-		}
-	}
-}

+ 0 - 258
store/store_test.go

@@ -1,258 +0,0 @@
-package store
-
-import (
-	"encoding/json"
-	"testing"
-	"time"
-)
-
-func TestStoreGetDelete(t *testing.T) {
-
-	s := CreateStore(100)
-	s.Set("foo", "bar", time.Unix(0, 0), 1)
-	res, err := s.Get("foo")
-
-	if err != nil {
-		t.Fatalf("Unknown error")
-	}
-
-	var result Response
-	json.Unmarshal(res, &result)
-
-	if result.Value != "bar" {
-		t.Fatalf("Cannot get stored value")
-	}
-
-	s.Delete("foo", 2)
-	_, err = s.Get("foo")
-
-	if err == nil {
-		t.Fatalf("Got deleted value")
-	}
-}
-
-func TestTestAndSet(t *testing.T) {
-	s := CreateStore(100)
-	s.Set("foo", "bar", time.Unix(0, 0), 1)
-
-	_, err := s.TestAndSet("foo", "barbar", "barbar", time.Unix(0, 0), 2)
-
-	if err == nil {
-		t.Fatalf("test bar == barbar should fail")
-	}
-
-	_, err = s.TestAndSet("foo", "bar", "barbar", time.Unix(0, 0), 3)
-
-	if err != nil {
-		t.Fatalf("test bar == bar should succeed")
-	}
-
-	_, err = s.TestAndSet("foo", "", "barbar", time.Unix(0, 0), 4)
-
-	if err == nil {
-		t.Fatalf("test empty == bar should fail")
-	}
-
-	_, err = s.TestAndSet("fooo", "bar", "barbar", time.Unix(0, 0), 5)
-
-	if err == nil {
-		t.Fatalf("test bar == non-existing key should fail")
-	}
-
-	_, err = s.TestAndSet("fooo", "", "bar", time.Unix(0, 0), 6)
-
-	if err != nil {
-		t.Fatalf("test empty == non-existing key should succeed")
-	}
-
-}
-
-func TestSaveAndRecovery(t *testing.T) {
-
-	s := CreateStore(100)
-	s.Set("foo", "bar", time.Unix(0, 0), 1)
-	s.Set("foo2", "bar2", time.Now().Add(time.Second*5), 2)
-	state, err := s.Save()
-
-	if err != nil {
-		t.Fatalf("Cannot Save %s", err)
-	}
-
-	newStore := CreateStore(100)
-
-	// wait for foo2 expires
-	time.Sleep(time.Second * 6)
-
-	newStore.Recovery(state)
-
-	res, err := newStore.Get("foo")
-
-	var result Response
-	json.Unmarshal(res, &result)
-
-	if result.Value != "bar" {
-		t.Fatalf("Recovery Fail")
-	}
-
-	res, err = newStore.Get("foo2")
-
-	if err == nil {
-		t.Fatalf("Get expired value")
-	}
-
-	s.Delete("foo", 3)
-
-}
-
-func TestExpire(t *testing.T) {
-	// test expire
-	s := CreateStore(100)
-	s.Set("foo", "bar", time.Now().Add(time.Second*1), 0)
-	time.Sleep(2 * time.Second)
-
-	_, err := s.Get("foo")
-
-	if err == nil {
-		t.Fatalf("Got expired value")
-	}
-
-	//test change expire time
-	s.Set("foo", "bar", time.Now().Add(time.Second*10), 1)
-
-	_, err = s.Get("foo")
-
-	if err != nil {
-		t.Fatalf("Cannot get Value")
-	}
-
-	s.Set("foo", "barbar", time.Now().Add(time.Second*1), 2)
-
-	time.Sleep(2 * time.Second)
-
-	_, err = s.Get("foo")
-
-	if err == nil {
-		t.Fatalf("Got expired value")
-	}
-
-	// test change expire to stable
-	s.Set("foo", "bar", time.Now().Add(time.Second*1), 3)
-
-	s.Set("foo", "bar", time.Unix(0, 0), 4)
-
-	time.Sleep(2 * time.Second)
-
-	_, err = s.Get("foo")
-
-	if err != nil {
-		t.Fatalf("Cannot get Value")
-	}
-
-	// test stable to expire
-	s.Set("foo", "bar", time.Now().Add(time.Second*1), 5)
-	time.Sleep(2 * time.Second)
-	_, err = s.Get("foo")
-
-	if err == nil {
-		t.Fatalf("Got expired value")
-	}
-
-	// test set older node
-	s.Set("foo", "bar", time.Now().Add(-time.Second*1), 6)
-	_, err = s.Get("foo")
-
-	if err == nil {
-		t.Fatalf("Got expired value")
-	}
-
-}
-
-func BenchmarkStoreSet(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 5)
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-
-		for i, key := range keys {
-			s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i))
-		}
-
-		s = CreateStore(100)
-	}
-}
-
-func BenchmarkStoreGet(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 5)
-
-	for i, key := range keys {
-		s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i))
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-
-		for _, key := range keys {
-			s.Get(key)
-		}
-
-	}
-}
-
-func BenchmarkStoreSnapshotCopy(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 5)
-
-	for i, key := range keys {
-		s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i))
-	}
-
-	var state []byte
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		s.clone()
-	}
-	b.SetBytes(int64(len(state)))
-}
-
-func BenchmarkSnapshotSaveJson(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 5)
-
-	for i, key := range keys {
-		s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i))
-	}
-
-	var state []byte
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		state, _ = s.Save()
-	}
-	b.SetBytes(int64(len(state)))
-}
-
-func BenchmarkSnapshotRecovery(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 5)
-
-	for i, key := range keys {
-		s.Set(key, "barbarbarbarbar", time.Unix(0, 0), uint64(i))
-	}
-
-	state, _ := s.Save()
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		newStore := CreateStore(100)
-		newStore.Recovery(state)
-	}
-	b.SetBytes(int64(len(state)))
-}

+ 0 - 21
store/test.go

@@ -1,21 +0,0 @@
-package store
-
-import (
-	"math/rand"
-	"strconv"
-)
-
-// GenKeys randomly generate num of keys with max depth
-func GenKeys(num int, depth int) []string {
-	keys := make([]string, num)
-	for i := 0; i < num; i++ {
-
-		keys[i] = "/foo"
-		depth := rand.Intn(depth) + 1
-
-		for j := 0; j < depth; j++ {
-			keys[i] += "/" + strconv.Itoa(rand.Int()%20)
-		}
-	}
-	return keys
-}

+ 0 - 318
store/tree.go

@@ -1,318 +0,0 @@
-package store
-
-import (
-	"path"
-	"sort"
-	"strings"
-	"time"
-)
-
-//------------------------------------------------------------------------------
-//
-// Typedefs
-//
-//------------------------------------------------------------------------------
-
-// A file system like tree structure. Each non-leaf node of the tree has a hashmap to
-// store its children nodes. Leaf nodes has no hashmap (a nil pointer)
-type tree struct {
-	Root *treeNode
-}
-
-// A treeNode wraps a Node. It has a hashmap to keep records of its children treeNodes.
-type treeNode struct {
-	InternalNode Node
-	Dir          bool
-	NodeMap      map[string]*treeNode
-}
-
-// TreeNode with its key. We use it when we need to sort the treeNodes.
-type tnWithKey struct {
-	key string
-	tn  *treeNode
-}
-
-// Define type and functions to match sort interface
-type tnWithKeySlice []tnWithKey
-
-func (s tnWithKeySlice) Len() int           { return len(s) }
-func (s tnWithKeySlice) Less(i, j int) bool { return s[i].key < s[j].key }
-func (s tnWithKeySlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
-
-// CONSTANT VARIABLE
-
-// Represent an empty node
-var emptyNode = Node{"", PERMANENT, nil}
-
-//------------------------------------------------------------------------------
-//
-// Methods
-//
-//------------------------------------------------------------------------------
-
-// Set the key to the given value, return true if success
-// If any intermidate path of the key is not a directory type, it will fail
-// For example if the /foo = Node(bar) exists, set /foo/foo = Node(barbar)
-// will fail.
-func (t *tree) set(key string, value Node) bool {
-
-	nodesName := split(key)
-
-	// avoid set value to "/"
-	if len(nodesName) == 1 && len(nodesName[0]) == 0 {
-		return false
-	}
-
-	nodeMap := t.Root.NodeMap
-
-	i := 0
-	newDir := false
-
-	// go through all the path
-	for i = 0; i < len(nodesName)-1; i++ {
-
-		// if we meet a new directory, all the directory after it must be new
-		if newDir {
-			tn := &treeNode{emptyNode, true, make(map[string]*treeNode)}
-			nodeMap[nodesName[i]] = tn
-			nodeMap = tn.NodeMap
-			continue
-		}
-
-		// get the node from the nodeMap of the current level
-		tn, ok := nodeMap[nodesName[i]]
-
-		if !ok {
-			// add a new directory and set newDir to true
-			newDir = true
-			tn := &treeNode{emptyNode, true, make(map[string]*treeNode)}
-			nodeMap[nodesName[i]] = tn
-			nodeMap = tn.NodeMap
-
-		} else if ok && !tn.Dir {
-
-			// if we meet a non-directory node, we cannot set the key
-			return false
-		} else {
-
-			// update the nodeMap to next level
-			nodeMap = tn.NodeMap
-		}
-
-	}
-
-	// Add the last node
-	tn, ok := nodeMap[nodesName[i]]
-
-	if !ok {
-		// we add a new treeNode
-		tn := &treeNode{value, false, nil}
-		nodeMap[nodesName[i]] = tn
-
-	} else {
-		if tn.Dir {
-			return false
-		}
-		// we change the value of a old Treenode
-		tn.InternalNode = value
-	}
-	return true
-
-}
-
-// Get the tree node of the key
-func (t *tree) internalGet(key string) (*treeNode, bool) {
-	nodesName := split(key)
-
-	// should be able to get root
-	if len(nodesName) == 1 && nodesName[0] == "" {
-		return t.Root, true
-	}
-
-	nodeMap := t.Root.NodeMap
-
-	var i int
-
-	for i = 0; i < len(nodesName)-1; i++ {
-		node, ok := nodeMap[nodesName[i]]
-		if !ok || !node.Dir {
-			return nil, false
-		}
-		nodeMap = node.NodeMap
-	}
-
-	tn, ok := nodeMap[nodesName[i]]
-	if ok {
-		return tn, ok
-	} else {
-		return nil, ok
-	}
-}
-
-// get the internalNode of the key
-func (t *tree) get(key string) (Node, bool) {
-	tn, ok := t.internalGet(key)
-
-	if ok {
-		if tn.Dir {
-			return emptyNode, false
-		}
-		return tn.InternalNode, ok
-	} else {
-		return emptyNode, ok
-	}
-}
-
-// get the internalNode of the key
-func (t *tree) list(directory string) (interface{}, []string, bool) {
-	treeNode, ok := t.internalGet(directory)
-
-	if !ok {
-		return nil, nil, ok
-
-	} else {
-		if !treeNode.Dir {
-			return &treeNode.InternalNode, nil, ok
-		}
-		length := len(treeNode.NodeMap)
-		nodes := make([]*Node, length)
-		keys := make([]string, length)
-
-		i := 0
-		for key, node := range treeNode.NodeMap {
-			nodes[i] = &node.InternalNode
-			keys[i] = key
-			i++
-		}
-
-		return nodes, keys, ok
-	}
-}
-
-// delete the key, return true if success
-func (t *tree) delete(key string) bool {
-	nodesName := split(key)
-
-	nodeMap := t.Root.NodeMap
-
-	var i int
-
-	for i = 0; i < len(nodesName)-1; i++ {
-		node, ok := nodeMap[nodesName[i]]
-		if !ok || !node.Dir {
-			return false
-		}
-		nodeMap = node.NodeMap
-	}
-
-	node, ok := nodeMap[nodesName[i]]
-	if ok && !node.Dir {
-		delete(nodeMap, nodesName[i])
-		return true
-	}
-	return false
-}
-
-// traverse wrapper
-func (t *tree) traverse(f func(string, *Node), sort bool) {
-	if sort {
-		sortDfs("", t.Root, f)
-	} else {
-		dfs("", t.Root, f)
-	}
-}
-
-// clone() will return a deep cloned tree
-func (t *tree) clone() *tree {
-	newTree := new(tree)
-	newTree.Root = &treeNode{
-		Node{
-			"/",
-			time.Unix(0, 0),
-			nil,
-		},
-		true,
-		make(map[string]*treeNode),
-	}
-	recursiveClone(t.Root, newTree.Root)
-	return newTree
-}
-
-// recursiveClone is a helper function for clone()
-func recursiveClone(tnSrc *treeNode, tnDes *treeNode) {
-	if !tnSrc.Dir {
-		tnDes.InternalNode = tnSrc.InternalNode
-		return
-
-	} else {
-		tnDes.InternalNode = tnSrc.InternalNode
-		tnDes.Dir = true
-		tnDes.NodeMap = make(map[string]*treeNode)
-
-		for key, tn := range tnSrc.NodeMap {
-			newTn := new(treeNode)
-			recursiveClone(tn, newTn)
-			tnDes.NodeMap[key] = newTn
-		}
-
-	}
-}
-
-// deep first search to traverse the tree
-// apply the func f to each internal node
-func dfs(key string, t *treeNode, f func(string, *Node)) {
-
-	// base case
-	if len(t.NodeMap) == 0 {
-		f(key, &t.InternalNode)
-
-		// recursion
-	} else {
-		for tnKey, tn := range t.NodeMap {
-			tnKey := key + "/" + tnKey
-			dfs(tnKey, tn, f)
-		}
-	}
-}
-
-// sort deep first search to traverse the tree
-// apply the func f to each internal node
-func sortDfs(key string, t *treeNode, f func(string, *Node)) {
-	// base case
-	if len(t.NodeMap) == 0 {
-		f(key, &t.InternalNode)
-
-		// recursion
-	} else {
-
-		s := make(tnWithKeySlice, len(t.NodeMap))
-		i := 0
-
-		// copy
-		for tnKey, tn := range t.NodeMap {
-			tnKey := key + "/" + tnKey
-			s[i] = tnWithKey{tnKey, tn}
-			i++
-		}
-
-		// sort
-		sort.Sort(s)
-
-		// traverse
-		for i = 0; i < len(t.NodeMap); i++ {
-			sortDfs(s[i].key, s[i].tn, f)
-		}
-	}
-}
-
-// split the key by '/', get the intermediate node name
-func split(key string) []string {
-	key = "/" + key
-	key = path.Clean(key)
-
-	// get the intermidate nodes name
-	nodesName := strings.Split(key, "/")
-	// we do not need the root node, since we start with it
-	nodesName = nodesName[1:]
-	return nodesName
-}

+ 0 - 247
store/tree_store_test.go

@@ -1,247 +0,0 @@
-package store
-
-import (
-	"fmt"
-	"math/rand"
-	"strconv"
-	"testing"
-	"time"
-)
-
-func TestStoreGet(t *testing.T) {
-
-	ts := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	// create key
-	ts.set("/foo", NewTestNode("bar"))
-	// change value
-	ts.set("/foo", NewTestNode("barbar"))
-	// create key
-	ts.set("/hello/foo", NewTestNode("barbarbar"))
-	treeNode, ok := ts.get("/foo")
-
-	if !ok {
-		t.Fatalf("Expect to get node, but not")
-	}
-	if treeNode.Value != "barbar" {
-		t.Fatalf("Expect value barbar, but got %s", treeNode.Value)
-	}
-
-	// create key
-	treeNode, ok = ts.get("/hello/foo")
-	if !ok {
-		t.Fatalf("Expect to get node, but not")
-	}
-	if treeNode.Value != "barbarbar" {
-		t.Fatalf("Expect value barbarbar, but got %s", treeNode.Value)
-	}
-
-	// create a key under other key
-	ok = ts.set("/foo/foo", NewTestNode("bar"))
-	if ok {
-		t.Fatalf("shoud not add key under a exisiting key")
-	}
-
-	// delete a key
-	ok = ts.delete("/foo")
-	if !ok {
-		t.Fatalf("cannot delete key")
-	}
-
-	// delete a directory
-	ok = ts.delete("/hello")
-	if ok {
-		t.Fatalf("Expect cannot delet /hello, but deleted! ")
-	}
-
-	// test list
-	ts.set("/hello/fooo", NewTestNode("barbarbar"))
-	ts.set("/hello/foooo/foo", NewTestNode("barbarbar"))
-
-	nodes, keys, ok := ts.list("/hello")
-
-	if !ok {
-		t.Fatalf("cannot list!")
-	} else {
-		nodes, _ := nodes.([]*Node)
-		length := len(nodes)
-
-		for i := 0; i < length; i++ {
-			fmt.Println(keys[i], "=", nodes[i].Value)
-		}
-	}
-
-	keys = GenKeys(100, 10)
-
-	for i := 0; i < 100; i++ {
-		value := strconv.Itoa(rand.Int())
-		ts.set(keys[i], NewTestNode(value))
-		treeNode, ok := ts.get(keys[i])
-
-		if !ok {
-			continue
-		}
-		if treeNode.Value != value {
-			t.Fatalf("Expect value %s, but got %s", value, treeNode.Value)
-		}
-
-	}
-	ts.traverse(f, true)
-}
-
-func TestTreeClone(t *testing.T) {
-	keys := GenKeys(10000, 10)
-
-	ts := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	backTs := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	// generate the first tree
-	for _, key := range keys {
-		value := strconv.Itoa(rand.Int())
-		ts.set(key, NewTestNode(value))
-		backTs.set(key, NewTestNode(value))
-	}
-
-	copyTs := ts.clone()
-
-	// test if they are identical
-	copyTs.traverse(ts.contain, false)
-
-	// remove all the keys from first tree
-	for _, key := range keys {
-		ts.delete(key)
-	}
-
-	// test if they are identical
-	// make sure changes in the first tree will affect the copy one
-	copyTs.traverse(backTs.contain, false)
-
-}
-
-func BenchmarkTreeStoreSet(b *testing.B) {
-
-	keys := GenKeys(10000, 10)
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-
-		ts := &tree{
-			&treeNode{
-				NewTestNode("/"),
-				true,
-				make(map[string]*treeNode),
-			},
-		}
-
-		for _, key := range keys {
-			value := strconv.Itoa(rand.Int())
-			ts.set(key, NewTestNode(value))
-		}
-	}
-}
-
-func BenchmarkTreeStoreGet(b *testing.B) {
-
-	keys := GenKeys(10000, 10)
-
-	ts := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	for _, key := range keys {
-		value := strconv.Itoa(rand.Int())
-		ts.set(key, NewTestNode(value))
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		for _, key := range keys {
-			ts.get(key)
-		}
-	}
-}
-
-func BenchmarkTreeStoreCopy(b *testing.B) {
-	keys := GenKeys(10000, 10)
-
-	ts := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	for _, key := range keys {
-		value := strconv.Itoa(rand.Int())
-		ts.set(key, NewTestNode(value))
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		ts.clone()
-	}
-}
-
-func BenchmarkTreeStoreList(b *testing.B) {
-
-	keys := GenKeys(10000, 10)
-
-	ts := &tree{
-		&treeNode{
-			NewTestNode("/"),
-			true,
-			make(map[string]*treeNode),
-		},
-	}
-
-	for _, key := range keys {
-		value := strconv.Itoa(rand.Int())
-		ts.set(key, NewTestNode(value))
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		for _, key := range keys {
-			ts.list(key)
-		}
-	}
-}
-
-func (t *tree) contain(key string, node *Node) {
-	_, ok := t.get(key)
-	if !ok {
-		panic("tree do not contain the given key")
-	}
-}
-
-func f(key string, n *Node) {
-	return
-}
-
-func NewTestNode(value string) Node {
-	return Node{value, time.Unix(0, 0), nil}
-}

+ 0 - 129
store/watcher.go

@@ -1,129 +0,0 @@
-package store
-
-import (
-	"path"
-	"strconv"
-	"strings"
-)
-
-//------------------------------------------------------------------------------
-//
-// Typedefs
-//
-//------------------------------------------------------------------------------
-
-// WatcherHub is where the client register its watcher
-type WatcherHub struct {
-	watchers map[string][]*Watcher
-}
-
-// Currently watcher only contains a response channel
-type Watcher struct {
-	C chan *Response
-}
-
-// Create a new watcherHub
-func newWatcherHub() *WatcherHub {
-	w := new(WatcherHub)
-	w.watchers = make(map[string][]*Watcher)
-	return w
-}
-
-// Create a new watcher
-func NewWatcher() *Watcher {
-	return &Watcher{C: make(chan *Response, 1)}
-}
-
-// Add a watcher to the watcherHub
-func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
-	responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
-
-	prefix = path.Clean("/" + prefix)
-
-	if sinceIndex != 0 && sinceIndex >= responseStartIndex {
-		for i := sinceIndex; i <= currentIndex; i++ {
-			if checkResponse(prefix, i, resMap) {
-				watcher.C <- resMap[strconv.FormatUint(i, 10)]
-				return nil
-			}
-		}
-	}
-
-	_, ok := w.watchers[prefix]
-
-	if !ok {
-		w.watchers[prefix] = make([]*Watcher, 0)
-	}
-
-	w.watchers[prefix] = append(w.watchers[prefix], watcher)
-
-	return nil
-}
-
-// Check if the response has what we are watching
-func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
-
-	resp, ok := resMap[strconv.FormatUint(index, 10)]
-
-	if !ok {
-		// not storage system command
-		return false
-	} else {
-		path := resp.Key
-		if strings.HasPrefix(path, prefix) {
-			prefixLen := len(prefix)
-			if len(path) == prefixLen || path[prefixLen] == '/' {
-				return true
-			}
-
-		}
-	}
-
-	return false
-}
-
-// Notify the watcher a action happened
-func (w *WatcherHub) notify(resp Response) error {
-	resp.Key = path.Clean(resp.Key)
-
-	segments := strings.Split(resp.Key, "/")
-	currPath := "/"
-
-	// walk through all the pathes
-	for _, segment := range segments {
-		currPath = path.Join(currPath, segment)
-
-		watchers, ok := w.watchers[currPath]
-
-		if ok {
-
-			newWatchers := make([]*Watcher, 0)
-			// notify all the watchers
-			for _, watcher := range watchers {
-				watcher.C <- &resp
-			}
-
-			if len(newWatchers) == 0 {
-				// we have notified all the watchers at this path
-				// delete the map
-				delete(w.watchers, currPath)
-			} else {
-				w.watchers[currPath] = newWatchers
-			}
-		}
-
-	}
-
-	return nil
-}
-
-// stopWatchers stops all the watchers
-// This function is used when the etcd recovery from a snapshot at runtime
-func (w *WatcherHub) stopWatchers() {
-	for _, subWatchers := range w.watchers {
-		for _, watcher := range subWatchers {
-			watcher.C <- nil
-		}
-	}
-	w.watchers = nil
-}

+ 0 - 84
store/watcher_test.go

@@ -1,84 +0,0 @@
-package store
-
-import (
-	"testing"
-	"time"
-)
-
-func TestWatch(t *testing.T) {
-
-	s := CreateStore(100)
-
-	watchers := make([]*Watcher, 10)
-
-	for i, _ := range watchers {
-
-		// create a new watcher
-		watchers[i] = NewWatcher()
-		// add to the watchers list
-		s.AddWatcher("foo", watchers[i], 0)
-
-	}
-
-	s.Set("/foo/foo", "bar", time.Unix(0, 0), 1)
-
-	for _, watcher := range watchers {
-
-		// wait for the notification for any changing
-		res := <-watcher.C
-
-		if res == nil {
-			t.Fatal("watcher is cleared")
-		}
-	}
-
-	for i, _ := range watchers {
-
-		// create a new watcher
-		watchers[i] = NewWatcher()
-		// add to the watchers list
-		s.AddWatcher("foo/foo/foo", watchers[i], 0)
-
-	}
-
-	s.watcher.stopWatchers()
-
-	for _, watcher := range watchers {
-
-		// wait for the notification for any changing
-		res := <-watcher.C
-
-		if res != nil {
-			t.Fatal("watcher is cleared")
-		}
-	}
-}
-
-// BenchmarkWatch creates 10K watchers watch at /foo/[path] each time.
-// Path is randomly chosen with max depth 10.
-// It should take less than 15ms to wake up 10K watchers.
-func BenchmarkWatch(b *testing.B) {
-	s := CreateStore(100)
-
-	keys := GenKeys(10000, 10)
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		watchers := make([]*Watcher, 10000)
-		for i := 0; i < 10000; i++ {
-			// create a new watcher
-			watchers[i] = NewWatcher()
-			// add to the watchers list
-			s.AddWatcher(keys[i], watchers[i], 0)
-		}
-
-		s.watcher.stopWatchers()
-
-		for _, watcher := range watchers {
-			// wait for the notification for any changing
-			<-watcher.C
-		}
-
-		s.watcher = newWatcherHub()
-	}
-}

+ 7 - 2
util.go

@@ -47,7 +47,7 @@ var storeMsg chan string
 // Help to send msg from store to webHub
 func webHelper() {
 	storeMsg = make(chan string)
-	etcdStore.SetMessager(storeMsg)
+	// etcdStore.SetMessager(storeMsg)
 	for {
 		// transfer the new msg to webHub
 		web.Hub().Send(<-storeMsg)
@@ -177,6 +177,11 @@ func check(err error) {
 	}
 }
 
+func getNodePath(urlPath string) string {
+	pathPrefixLen := len("/" + version + "/keys")
+	return urlPath[pathPrefixLen:]
+}
+
 //--------------------------------------
 // Log
 //--------------------------------------
@@ -259,7 +264,7 @@ func directSet() {
 
 func send(c chan bool) {
 	for i := 0; i < 10; i++ {
-		command := &SetCommand{}
+		command := &UpdateCommand{}
 		command.Key = "foo"
 		command.Value = "bar"
 		command.ExpireTime = time.Unix(0, 0)