Browse Source

Merge pull request #292 from xiangli-cmu/fix-ttl

WIP: fix ttl
Xiang Li 12 years ago
parent
commit
5abbaf59e3

+ 1 - 4
error/error.go

@@ -80,16 +80,14 @@ type Error struct {
 	Message   string `json:"message"`
 	Cause     string `json:"cause,omitempty"`
 	Index     uint64 `json:"index"`
-	Term      uint64 `json:"term"`
 }
 
-func NewError(errorCode int, cause string, index uint64, term uint64) *Error {
+func NewError(errorCode int, cause string, index uint64) *Error {
 	return &Error{
 		ErrorCode: errorCode,
 		Message:   errors[errorCode],
 		Cause:     cause,
 		Index:     index,
-		Term:      term,
 	}
 }
 
@@ -109,7 +107,6 @@ func (e Error) toJsonString() string {
 
 func (e Error) Write(w http.ResponseWriter) {
 	w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
-	w.Header().Add("X-Etcd-Term", fmt.Sprint(e.Term))
 	// 3xx is reft internal error
 	if e.ErrorCode/100 == 3 {
 		http.Error(w, e.toJsonString(), http.StatusInternalServerError)

+ 10 - 10
server/join_command.go

@@ -14,20 +14,20 @@ func init() {
 
 // The JoinCommand adds a node to the cluster.
 type JoinCommand struct {
-	MinVersion int `json:"minVersion"`
-	MaxVersion int `json:"maxVersion"`
-	Name        string `json:"name"`
-	RaftURL     string `json:"raftURL"`
-	EtcdURL     string `json:"etcdURL"`
+	MinVersion int    `json:"minVersion"`
+	MaxVersion int    `json:"maxVersion"`
+	Name       string `json:"name"`
+	RaftURL    string `json:"raftURL"`
+	EtcdURL    string `json:"etcdURL"`
 }
 
 func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand {
 	return &JoinCommand{
 		MinVersion: minVersion,
 		MaxVersion: maxVersion,
-		Name:        name,
-		RaftURL:     raftUrl,
-		EtcdURL:     etcdUrl,
+		Name:       name,
+		RaftURL:    raftUrl,
+		EtcdURL:    etcdUrl,
 	}
 }
 
@@ -54,11 +54,11 @@ func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
 	// Check machine number in the cluster
 	if ps.registry.Count() == ps.MaxClusterSize {
 		log.Debug("Reject join request from ", c.Name)
-		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
+		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex())
 	}
 
 	// Add to shared machine registry.
-	ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
+	ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)
 
 	// Add peer in raft
 	err := server.AddPeer(c.Name, "")

+ 14 - 0
server/peer_server.go

@@ -136,6 +136,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		log.Debugf("%s restart as a follower", s.name)
 	}
 
+	go s.monitorSync()
+
 	// open the snapshot
 	if snapshot {
 		go s.monitorSnapshot()
@@ -424,3 +426,15 @@ func (s *PeerServer) monitorSnapshot() {
 		}
 	}
 }
+
+func (s *PeerServer) monitorSync() {
+	ticker := time.Tick(time.Millisecond * 500)
+	for {
+		select {
+		case now := <-ticker:
+			if s.raftServer.State() == raft.Leader {
+				s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
+			}
+		}
+	}
+}

+ 9 - 9
server/registry.go

@@ -38,20 +38,20 @@ func NewRegistry(s store.Store) *Registry {
 }
 
 // Adds a node to the registry.
-func (r *Registry) Register(name string, peerURL string, url string, commitIndex uint64, term uint64) error {
+func (r *Registry) Register(name string, peerURL string, url string) error {
 	r.Lock()
 	defer r.Unlock()
 
 	// Write data to store.
 	key := path.Join(RegistryKey, name)
 	value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
-	_, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term)
+	_, err := r.store.Create(key, value, false, store.Permanent)
 	log.Debugf("Register: %s", name)
 	return err
 }
 
 // Removes a node from the registry.
-func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error {
+func (r *Registry) Unregister(name string) error {
 	r.Lock()
 	defer r.Unlock()
 
@@ -59,14 +59,14 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro
 	// delete(r.nodes, name)
 
 	// Remove the key from the store.
-	_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
+	_, err := r.store.Delete(path.Join(RegistryKey, name), false)
 	log.Debugf("Unregister: %s", name)
 	return err
 }
 
 // Returns the number of nodes in the cluster.
 func (r *Registry) Count() int {
-	e, err := r.store.Get(RegistryKey, false, false, 0, 0)
+	e, err := r.store.Get(RegistryKey, false, false)
 	if err != nil {
 		return 0
 	}
@@ -133,7 +133,7 @@ func (r *Registry) urls(leaderName, selfName string, url func(name string) (stri
 	}
 
 	// Retrieve a list of all nodes.
-	if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
+	if e, _ := r.store.Get(RegistryKey, false, false); e != nil {
 		// Lookup the URL for each one.
 		for _, pair := range e.KVPairs {
 			_, name := filepath.Split(pair.Key)
@@ -160,7 +160,7 @@ func (r *Registry) load(name string) {
 	}
 
 	// Retrieve from store.
-	e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
+	e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
 	if err != nil {
 		return
 	}
@@ -173,7 +173,7 @@ func (r *Registry) load(name string) {
 
 	// Create node.
 	r.nodes[name] = &node{
-		url:         m["etcd"][0],
-		peerURL:     m["raft"][0],
+		url:     m["etcd"][0],
+		peerURL: m["raft"][0],
 	}
 }

+ 1 - 1
server/remove_command.go

@@ -27,7 +27,7 @@ func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) {
 	ps, _ := server.Context().(*PeerServer)
 
 	// Remove node from the shared registry.
-	err := ps.registry.Unregister(c.Name, server.CommitIndex(), server.Term())
+	err := ps.registry.Unregister(c.Name)
 
 	// Delete from stats
 	delete(ps.followersStats.Followers, c.Name)

+ 11 - 4
server/server.go

@@ -232,6 +232,7 @@ func (s *Server) Close() {
 	}
 }
 
+// Dispatch command to the current leader
 func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
 	ps := s.peerServer
 	if ps.raftServer.State() == raft.Leader {
@@ -241,7 +242,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 		}
 
 		if result == nil {
-			return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
 		}
 
 		// response for raft related commands[join/remove]
@@ -259,6 +260,12 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 			e, _ := result.(*store.Event)
 			b, _ = json.Marshal(e)
 
+			// etcd index should be the same as the event index
+			// which is also the last modified index of the node
+			w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
+			w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
+			w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
+
 			if e.IsCreated() {
 				w.WriteHeader(http.StatusCreated)
 			} else {
@@ -275,7 +282,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 
 		// No leader available.
 		if leader == "" {
-			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(300, "", s.Store().Index())
 		}
 
 		var url string
@@ -324,7 +331,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
 func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
 	leader := s.peerServer.RaftServer().Leader()
 	if leader == "" {
-		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
 	}
 	w.WriteHeader(http.StatusOK)
 	url, _ := s.registry.PeerURL(leader)
@@ -355,7 +362,7 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
 
 	leader := s.peerServer.RaftServer().Leader()
 	if leader == "" {
-		return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(300, "", s.Store().Index())
 	}
 	hostname, _ := s.registry.ClientURL(leader)
 	redirect(hostname, w, req)

+ 1 - 1
server/v1/get_key_handler.go

@@ -13,7 +13,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	key := "/" + vars["key"]
 
 	// Retrieve the key from the store.
-	event, err := s.Store().Get(key, false, false, s.CommitIndex(), s.Term())
+	event, err := s.Store().Get(key, false, false)
 	if err != nil {
 		return err
 	}

+ 2 - 2
server/v1/set_key_handler.go

@@ -19,13 +19,13 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	// Parse non-blank value.
 	value := req.Form.Get("value")
 	if len(value) == 0 {
-		return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(200, "Set", s.Store().Index())
 	}
 
 	// Convert time-to-live to an expiration time.
 	expireTime, err := store.TTL(req.Form.Get("ttl"))
 	if err != nil {
-		return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(202, "Set", s.Store().Index())
 	}
 
 	// If the "prevValue" is specified then test-and-set. Otherwise create a new key.

+ 3 - 4
server/v1/watch_key_handler.go

@@ -6,7 +6,6 @@ import (
 	"strconv"
 
 	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
 	"github.com/gorilla/mux"
 )
 
@@ -21,14 +20,14 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	if req.Method == "POST" {
 		sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64)
 		if err != nil {
-			return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(203, "Watch From Index", s.Store().Index())
 		}
 	}
 
 	// Start the watcher on the store.
-	c, err := s.Store().Watch(key, false, sinceIndex, s.CommitIndex(), s.Term())
+	c, err := s.Store().Watch(key, false, sinceIndex)
 	if err != nil {
-		return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(500, key, s.Store().Index())
 	}
 	event := <-c
 

+ 8 - 7
server/v2/get_handler.go

@@ -41,14 +41,14 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		if waitIndex != "" {
 			sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
 			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
 			}
 		}
 
 		// Start the watcher on the store.
-		eventChan, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
+		eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
 		if err != nil {
-			return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(500, key, s.Store().Index())
 		}
 
 		cn, _ := w.(http.CloseNotifier)
@@ -62,17 +62,18 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 	} else { //get
 		// Retrieve the key from the store.
-		event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
+		event, err = s.Store().Get(key, recursive, sorted)
 		if err != nil {
 			return err
 		}
 	}
 
-	w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
-	w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
+	w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
+	w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
+	w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
 	w.WriteHeader(http.StatusOK)
-
 	b, _ := json.Marshal(event)
+
 	w.Write(b)
 
 	return nil

+ 1 - 1
server/v2/post_handler.go

@@ -15,7 +15,7 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	value := req.FormValue("value")
 	expireTime, err := store.TTL(req.FormValue("ttl"))
 	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
 	}
 
 	c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)

+ 4 - 4
server/v2/put_handler.go

@@ -22,7 +22,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	value := req.Form.Get("value")
 	expireTime, err := store.TTL(req.Form.Get("ttl"))
 	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
 	}
 
 	_, valueOk := req.Form["prevValue"]
@@ -59,7 +59,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 		// bad previous index
 		if err != nil {
-			return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store().Index())
 		}
 	} else {
 		prevIndex = 0
@@ -67,7 +67,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 	if valueOk {
 		if prevValue == "" {
-			return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
+			return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store().Index())
 		}
 	}
 
@@ -88,7 +88,7 @@ func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, valu
 func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
 	// Update should give at least one option
 	if value == "" && expireTime.Sub(store.Permanent) == 0 {
-		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store().Index())
 	}
 
 	c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime)

+ 1 - 1
server/v2/tests/delete_handler_test.go

@@ -24,6 +24,6 @@ func TestV2DeleteKey(t *testing.T) {
 		resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
 		body := tests.ReadBody(resp)
 		assert.Nil(t, err, "")
-		assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4,"term":0}`, "")
+		assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","modifiedIndex":2}`, "")
 	})
 }

+ 15 - 8
server/v2/tests/get_handler_test.go

@@ -27,8 +27,7 @@ func TestV2GetKey(t *testing.T) {
 		assert.Equal(t, body["action"], "get", "")
 		assert.Equal(t, body["key"], "/foo/bar", "")
 		assert.Equal(t, body["value"], "XXX", "")
-		assert.Equal(t, body["index"], 3, "")
-		assert.Equal(t, body["term"], 0, "")
+		assert.Equal(t, body["modifiedIndex"], 1, "")
 	})
 }
 
@@ -55,7 +54,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
 		assert.Equal(t, body["action"], "get", "")
 		assert.Equal(t, body["key"], "/foo", "")
 		assert.Equal(t, body["dir"], true, "")
-		assert.Equal(t, body["index"], 4, "")
+		assert.Equal(t, body["modifiedIndex"], 1, "")
 		assert.Equal(t, len(body["kvs"].([]interface{})), 2, "")
 
 		kv0 := body["kvs"].([]interface{})[0].(map[string]interface{})
@@ -81,9 +80,11 @@ func TestV2GetKeyRecursively(t *testing.T) {
 func TestV2WatchKey(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		var body map[string]interface{}
+		c := make(chan bool)
 		go func() {
 			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true"))
 			body = tests.ReadBodyJSON(resp)
+			c <- true
 		}()
 
 		// Make sure response didn't fire early.
@@ -98,12 +99,19 @@ func TestV2WatchKey(t *testing.T) {
 
 		// A response should follow from the GET above.
 		time.Sleep(1 * time.Millisecond)
+
+		select {
+		case <-c:
+
+		default:
+			t.Fatal("cannot get watch result")
+		}
+
 		assert.NotNil(t, body, "")
 		assert.Equal(t, body["action"], "set", "")
 		assert.Equal(t, body["key"], "/foo/bar", "")
 		assert.Equal(t, body["value"], "XXX", "")
-		assert.Equal(t, body["index"], 3, "")
-		assert.Equal(t, body["term"], 0, "")
+		assert.Equal(t, body["modifiedIndex"], 1, "")
 	})
 }
 
@@ -118,7 +126,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		var body map[string]interface{}
 		c := make(chan bool)
 		go func() {
-			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=5"))
+			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=2"))
 			body = tests.ReadBodyJSON(resp)
 			c <- true
 		}()
@@ -156,7 +164,6 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		assert.Equal(t, body["action"], "set", "")
 		assert.Equal(t, body["key"], "/foo/bar", "")
 		assert.Equal(t, body["value"], "YYY", "")
-		assert.Equal(t, body["index"], 4, "")
-		assert.Equal(t, body["term"], 0, "")
+		assert.Equal(t, body["modifiedIndex"], 2, "")
 	})
 }

+ 4 - 4
server/v2/tests/post_handler_test.go

@@ -21,18 +21,18 @@ func TestV2CreateUnique(t *testing.T) {
 		resp, _ := tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "create", "")
-		assert.Equal(t, body["key"], "/foo/bar/3", "")
+		assert.Equal(t, body["key"], "/foo/bar/1", "")
 		assert.Equal(t, body["dir"], true, "")
-		assert.Equal(t, body["index"], 3, "")
+		assert.Equal(t, body["modifiedIndex"], 1, "")
 
 		// Second POST should add next index to list.
 		resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
 		body = tests.ReadBodyJSON(resp)
-		assert.Equal(t, body["key"], "/foo/bar/4", "")
+		assert.Equal(t, body["key"], "/foo/bar/2", "")
 
 		// POST to a different key should add index to that list.
 		resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
 		body = tests.ReadBodyJSON(resp)
-		assert.Equal(t, body["key"], "/foo/baz/5", "")
+		assert.Equal(t, body["key"], "/foo/baz/3", "")
 	})
 }

+ 11 - 11
server/v2/tests/put_handler_test.go

@@ -22,7 +22,7 @@ func TestV2SetKey(t *testing.T) {
 		resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBody(resp)
 		assert.Nil(t, err, "")
-		assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","index":3,"term":0}`, "")
+		assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","modifiedIndex":1}`, "")
 	})
 }
 
@@ -42,7 +42,7 @@ func TestV2SetKeyWithTTL(t *testing.T) {
 
 		// Make sure the expiration date is correct.
 		expiration, _ := time.Parse(time.RFC3339Nano, body["expiration"].(string))
-		assert.Equal(t, expiration.Sub(t0) / time.Second, 20, "")
+		assert.Equal(t, expiration.Sub(t0)/time.Second, 20, "")
 	})
 }
 
@@ -110,7 +110,7 @@ func TestV2UpdateKeySuccess(t *testing.T) {
 		v.Set("value", "XXX")
 		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
-		
+
 		v.Set("value", "YYY")
 		v.Set("prevExist", "true")
 		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
@@ -160,7 +160,7 @@ func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
 // Ensures that a key is set only if the previous index matches.
 //
 //   $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
-//   $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=3
+//   $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=1
 //
 func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
@@ -169,13 +169,13 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
 		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 		v.Set("value", "YYY")
-		v.Set("prevIndex", "3")
+		v.Set("prevIndex", "1")
 		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "compareAndSwap", "")
 		assert.Equal(t, body["prevValue"], "XXX", "")
 		assert.Equal(t, body["value"], "YYY", "")
-		assert.Equal(t, body["index"], 4, "")
+		assert.Equal(t, body["modifiedIndex"], 2, "")
 	})
 }
 
@@ -196,8 +196,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
 		assert.Equal(t, body["message"], "Test Failed", "")
-		assert.Equal(t, body["cause"], "[ != XXX] [10 != 3]", "")
-		assert.Equal(t, body["index"], 4, "")
+		assert.Equal(t, body["cause"], "[ != XXX] [10 != 1]", "")
+		assert.Equal(t, body["index"], 1, "")
 	})
 }
 
@@ -236,7 +236,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
 		assert.Equal(t, body["action"], "compareAndSwap", "")
 		assert.Equal(t, body["prevValue"], "XXX", "")
 		assert.Equal(t, body["value"], "YYY", "")
-		assert.Equal(t, body["index"], 4, "")
+		assert.Equal(t, body["modifiedIndex"], 2, "")
 	})
 }
 
@@ -257,8 +257,8 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
 		assert.Equal(t, body["message"], "Test Failed", "")
-		assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 3]", "")
-		assert.Equal(t, body["index"], 4, "")
+		assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 1]", "")
+		assert.Equal(t, body["index"], 1, "")
 	})
 }
 

+ 1 - 0
store/command_factory.go

@@ -21,6 +21,7 @@ type CommandFactory interface {
 	CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
 	CreateDeleteCommand(key string, recursive bool) raft.Command
 	CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
+	CreateSyncCommand(now time.Time) raft.Command
 }
 
 // RegisterCommandFactory adds a command factory to the global registry.

+ 19 - 23
store/event.go

@@ -14,31 +14,23 @@ const (
 	Expire         = "expire"
 )
 
-const (
-	UndefIndex = 0
-	UndefTerm  = 0
-)
-
 type Event struct {
-	Action     string     `json:"action"`
-	Key        string     `json:"key, omitempty"`
-	Dir        bool       `json:"dir,omitempty"`
-	PrevValue  string     `json:"prevValue,omitempty"`
-	Value      string     `json:"value,omitempty"`
-	KVPairs    kvPairs    `json:"kvs,omitempty"`
-	Expiration *time.Time `json:"expiration,omitempty"`
-	TTL        int64      `json:"ttl,omitempty"` // Time to live in second
-	// The command index of the raft machine when the command is executed
-	Index uint64 `json:"index"`
-	Term  uint64 `json:"term"`
+	Action        string     `json:"action"`
+	Key           string     `json:"key, omitempty"`
+	Dir           bool       `json:"dir,omitempty"`
+	PrevValue     string     `json:"prevValue,omitempty"`
+	Value         string     `json:"value,omitempty"`
+	KVPairs       kvPairs    `json:"kvs,omitempty"`
+	Expiration    *time.Time `json:"expiration,omitempty"`
+	TTL           int64      `json:"ttl,omitempty"` // Time to live in second
+	ModifiedIndex uint64     `json:"modifiedIndex"`
 }
 
-func newEvent(action string, key string, index uint64, term uint64) *Event {
+func newEvent(action string, key string, index uint64) *Event {
 	return &Event{
-		Action: action,
-		Key:    key,
-		Index:  index,
-		Term:   term,
+		Action:        action,
+		Key:           key,
+		ModifiedIndex: index,
 	}
 }
 
@@ -54,6 +46,10 @@ func (e *Event) IsCreated() bool {
 	return false
 }
 
+func (e *Event) Index() uint64 {
+	return e.ModifiedIndex
+}
+
 // Converts an event object into a response object.
 func (event *Event) Response() interface{} {
 	if !event.Dir {
@@ -62,7 +58,7 @@ func (event *Event) Response() interface{} {
 			Key:        event.Key,
 			Value:      event.Value,
 			PrevValue:  event.PrevValue,
-			Index:      event.Index,
+			Index:      event.ModifiedIndex,
 			TTL:        event.TTL,
 			Expiration: event.Expiration,
 		}
@@ -87,7 +83,7 @@ func (event *Event) Response() interface{} {
 				Key:    kv.Key,
 				Value:  kv.Value,
 				Dir:    kv.Dir,
-				Index:  event.Index,
+				Index:  event.ModifiedIndex,
 			}
 		}
 		return responses

+ 9 - 24
store/event_history.go

@@ -12,8 +12,6 @@ type EventHistory struct {
 	Queue      eventQueue
 	StartIndex uint64
 	LastIndex  uint64
-	LastTerm   uint64
-	DupCnt     uint64 // help to compute the watching point with duplicated indexes in the queue
 	rwl        sync.RWMutex
 }
 
@@ -31,21 +29,11 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 	eh.rwl.Lock()
 	defer eh.rwl.Unlock()
 
-	var duped uint64
-
-	if e.Index == UndefIndex {
-		e.Index = eh.LastIndex
-		e.Term = eh.LastTerm
-		duped = 1
-	}
-
 	eh.Queue.insert(e)
 
-	eh.LastIndex = e.Index
-	eh.LastTerm = e.Term
-	eh.DupCnt += duped
+	eh.LastIndex = e.Index()
 
-	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
+	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].ModifiedIndex
 
 	return e
 }
@@ -56,32 +44,31 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	start := index - eh.StartIndex
-
 	// the index should locate after the event history's StartIndex
-	if start < 0 {
+	if index-eh.StartIndex < 0 {
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
 				fmt.Sprintf("the requested history has been cleared [%v/%v]",
-					eh.StartIndex, index), UndefIndex, UndefTerm)
+					eh.StartIndex, index), 0)
 	}
 
 	// the index should locate before the size of the queue minus the duplicate count
-	if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
+	if index > eh.LastIndex { // future index
 		return nil, nil
 	}
 
-	i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
+	i := eh.Queue.Front
 
 	for {
 		e := eh.Queue.Events[i]
-		if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
+
+		if strings.HasPrefix(e.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one
 			return e, nil
 		}
 
 		i = (i + 1) % eh.Queue.Capacity
 
-		if i == eh.Queue.back() { // find nothing, return and watch from current index
+		if i > eh.Queue.back() {
 			return nil, nil
 		}
 	}
@@ -105,8 +92,6 @@ func (eh *EventHistory) clone() *EventHistory {
 		StartIndex: eh.StartIndex,
 		Queue:      clonedQueue,
 		LastIndex:  eh.LastIndex,
-		LastTerm:   eh.LastTerm,
-		DupCnt:     eh.DupCnt,
 	}
 
 }

+ 10 - 10
store/event_test.go

@@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) {
 
 	// Add
 	for i := 0; i < 200; i++ {
-		e := newEvent(Create, "/foo", uint64(i), 1)
+		e := newEvent(Create, "/foo", uint64(i))
 		eh.addEvent(e)
 	}
 
@@ -23,7 +23,7 @@ func TestEventQueue(t *testing.T) {
 	n := eh.Queue.Size
 	for ; n > 0; n-- {
 		e := eh.Queue.Events[i]
-		if e.Index != uint64(j) {
+		if e.Index() != uint64(j) {
 			t.Fatalf("queue error!")
 		}
 		j++
@@ -35,26 +35,26 @@ func TestScanHistory(t *testing.T) {
 	eh := newEventHistory(100)
 
 	// Add
-	eh.addEvent(newEvent(Create, "/foo", 1, 1))
-	eh.addEvent(newEvent(Create, "/foo/bar", 2, 1))
-	eh.addEvent(newEvent(Create, "/foo/foo", 3, 1))
-	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1))
-	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1))
+	eh.addEvent(newEvent(Create, "/foo", 1))
+	eh.addEvent(newEvent(Create, "/foo/bar", 2))
+	eh.addEvent(newEvent(Create, "/foo/foo", 3))
+	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4))
+	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5))
 
 	e, err := eh.scan("/foo", 1)
-	if err != nil || e.Index != 1 {
+	if err != nil || e.Index() != 1 {
 		t.Fatalf("scan error [/foo] [1] %v", e.Index)
 	}
 
 	e, err = eh.scan("/foo/bar", 1)
 
-	if err != nil || e.Index != 2 {
+	if err != nil || e.Index() != 2 {
 		t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
 	}
 
 	e, err = eh.scan("/foo/bar", 3)
 
-	if err != nil || e.Index != 4 {
+	if err != nil || e.Index() != 4 {
 		t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
 	}
 

+ 80 - 0
store/heap_test.go

@@ -0,0 +1,80 @@
+package store
+
+import (
+	"fmt"
+	"testing"
+	"time"
+)
+
+func TestHeapPushPop(t *testing.T) {
+	h := newTtlKeyHeap()
+
+	// add from older expire time to earlier expire time
+	// the path is equal to ttl from now
+	for i := 0; i < 10; i++ {
+		path := fmt.Sprintf("%v", 10-i)
+		m := time.Duration(10 - i)
+		n := newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
+		h.push(n)
+	}
+
+	min := time.Now()
+
+	for i := 0; i < 10; i++ {
+		node := h.pop()
+		if node.ExpireTime.Before(min) {
+			t.Fatal("heap sort wrong!")
+		}
+		min = node.ExpireTime
+	}
+
+}
+
+func TestHeapUpdate(t *testing.T) {
+	h := newTtlKeyHeap()
+
+	kvs := make([]*Node, 10)
+
+	// add from older expire time to earlier expire time
+	// the path is equal to ttl from now
+	for i, n := range kvs {
+		path := fmt.Sprintf("%v", 10-i)
+		m := time.Duration(10 - i)
+		n = newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
+		kvs[i] = n
+		h.push(n)
+	}
+
+	// Path 7
+	kvs[3].ExpireTime = time.Now().Add(time.Second * 11)
+
+	// Path 5
+	kvs[5].ExpireTime = time.Now().Add(time.Second * 12)
+
+	h.update(kvs[3])
+	h.update(kvs[5])
+
+	min := time.Now()
+
+	for i := 0; i < 10; i++ {
+		node := h.pop()
+		if node.ExpireTime.Before(min) {
+			t.Fatal("heap sort wrong!")
+		}
+		min = node.ExpireTime
+
+		if i == 8 {
+			if node.Path != "7" {
+				t.Fatal("heap sort wrong!", node.Path)
+			}
+		}
+
+		if i == 9 {
+			if node.Path != "5" {
+				t.Fatal("heap sort wrong!")
+			}
+		}
+
+	}
+
+}

+ 7 - 6
store/kv_pairs.go

@@ -6,12 +6,13 @@ import (
 
 // When user list a directory, we add all the node into key-value pair slice
 type KeyValuePair struct {
-	Key        string     `json:"key, omitempty"`
-	Value      string     `json:"value,omitempty"`
-	Dir        bool       `json:"dir,omitempty"`
-	Expiration *time.Time `json:"expiration,omitempty"`
-	TTL        int64      `json:"ttl,omitempty"` // Time to live in second
-	KVPairs    kvPairs    `json:"kvs,omitempty"`
+	Key           string     `json:"key, omitempty"`
+	Value         string     `json:"value,omitempty"`
+	Dir           bool       `json:"dir,omitempty"`
+	Expiration    *time.Time `json:"expiration,omitempty"`
+	TTL           int64      `json:"ttl,omitempty"` // Time to live in second
+	KVPairs       kvPairs    `json:"kvs,omitempty"`
+	ModifiedIndex uint64     `json:"modifiedIndex,omitempty"`
 }
 
 type kvPairs []KeyValuePair

+ 63 - 147
store/node.go

@@ -3,20 +3,12 @@ package store
 import (
 	"path"
 	"sort"
-	"sync"
 	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
 )
 
-var (
-	Permanent time.Time
-)
-
-const (
-	normal = iota
-	removed
-)
+var Permanent time.Time
 
 // Node is the basic element in the store system.
 // A key-value pair will have a string value
@@ -25,11 +17,9 @@ type Node struct {
 	Path string
 
 	CreateIndex   uint64
-	CreateTerm    uint64
 	ModifiedIndex uint64
-	ModifiedTerm  uint64
 
-	Parent *Node `json:"-"` // should not encode this field! avoid cyclical dependency.
+	Parent *Node `json:"-"` // should not encode this field! avoid circular dependency.
 
 	ExpireTime time.Time
 	ACL        string
@@ -38,49 +28,37 @@ type Node struct {
 
 	// A reference to the store this node is attached to.
 	store *store
-
-	// a ttl node will have an expire routine associated with it.
-	// we need a channel to stop that routine when the expiration changes.
-	stopExpire chan bool
-
-	// ensure we only delete the node once
-	// expire and remove may try to delete a node twice
-	once sync.Once
 }
 
 // newKV creates a Key-Value pair
 func newKV(store *store, nodePath string, value string, createIndex uint64,
-	createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+	parent *Node, ACL string, expireTime time.Time) *Node {
 
 	return &Node{
 		Path:          nodePath,
 		CreateIndex:   createIndex,
-		CreateTerm:    createTerm,
 		ModifiedIndex: createIndex,
-		ModifiedTerm:  createTerm,
 		Parent:        parent,
 		ACL:           ACL,
 		store:         store,
-		stopExpire:    make(chan bool, 1),
 		ExpireTime:    expireTime,
 		Value:         value,
 	}
 }
 
 // newDir creates a directory
-func newDir(store *store, nodePath string, createIndex uint64, createTerm uint64,
-	parent *Node, ACL string, expireTime time.Time) *Node {
+func newDir(store *store, nodePath string, createIndex uint64, parent *Node,
+	ACL string, expireTime time.Time) *Node {
 
 	return &Node{
-		Path:        nodePath,
-		CreateIndex: createIndex,
-		CreateTerm:  createTerm,
-		Parent:      parent,
-		ACL:         ACL,
-		stopExpire:  make(chan bool, 1),
-		ExpireTime:  expireTime,
-		Children:    make(map[string]*Node),
-		store:       store,
+		Path:          nodePath,
+		CreateIndex:   createIndex,
+		ModifiedIndex: createIndex,
+		Parent:        parent,
+		ACL:           ACL,
+		ExpireTime:    expireTime,
+		Children:      make(map[string]*Node),
+		store:         store,
 	}
 }
 
@@ -97,21 +75,10 @@ func (n *Node) IsHidden() bool {
 
 // IsPermanent function checks if the node is a permanent one.
 func (n *Node) IsPermanent() bool {
-	return n.ExpireTime.Sub(Permanent) == 0
-}
-
-// IsExpired function checks if the node has been expired.
-func (n *Node) IsExpired() (bool, time.Duration) {
-	if n.IsPermanent() {
-		return false, 0
-	}
-
-	duration := n.ExpireTime.Sub(time.Now())
-	if duration <= 0 {
-		return true, 0
-	}
-
-	return false, duration
+	// we use a uninitialized time.Time to indicate the node is a
+	// permanent one.
+	// the uninitialized time.Time should equal zero.
+	return n.ExpireTime.IsZero()
 }
 
 // IsDir function checks whether the node is a directory.
@@ -125,7 +92,7 @@ func (n *Node) IsDir() bool {
 // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
 func (n *Node) Read() (string, *etcdErr.Error) {
 	if n.IsDir() {
-		return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
+		return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
 	}
 
 	return n.Value, nil
@@ -133,20 +100,19 @@ func (n *Node) Read() (string, *etcdErr.Error) {
 
 // Write function set the value of the node to the given value.
 // If the receiver node is a directory, a "Not A File" error will be returned.
-func (n *Node) Write(value string, index uint64, term uint64) *etcdErr.Error {
+func (n *Node) Write(value string, index uint64) *etcdErr.Error {
 	if n.IsDir() {
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
 	}
 
 	n.Value = value
 	n.ModifiedIndex = index
-	n.ModifiedTerm = term
 
 	return nil
 }
 
 func (n *Node) ExpirationAndTTL() (*time.Time, int64) {
-	if n.ExpireTime.Sub(Permanent) != 0 {
+	if !n.IsPermanent() {
 		return &n.ExpireTime, int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
 	}
 	return nil, 0
@@ -156,7 +122,7 @@ func (n *Node) ExpirationAndTTL() (*time.Time, int64) {
 // If the receiver node is not a directory, a "Not A Directory" error will be returned.
 func (n *Node) List() ([]*Node, *etcdErr.Error) {
 	if !n.IsDir() {
-		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "", n.store.Index())
 	}
 
 	nodes := make([]*Node, len(n.Children))
@@ -174,7 +140,7 @@ func (n *Node) List() ([]*Node, *etcdErr.Error) {
 // On success, it returns the file node
 func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) {
 	if !n.IsDir() {
-		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path, UndefIndex, UndefTerm)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path, n.store.Index())
 	}
 
 	child, ok := n.Children[name]
@@ -192,7 +158,7 @@ func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) {
 // error will be returned
 func (n *Node) Add(child *Node) *etcdErr.Error {
 	if !n.IsDir() {
-		return etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeNotDir, "", n.store.Index())
 	}
 
 	_, name := path.Split(child.Path)
@@ -200,7 +166,7 @@ func (n *Node) Add(child *Node) *etcdErr.Error {
 	_, ok := n.Children[name]
 
 	if ok {
-		return etcdErr.NewError(etcdErr.EcodeNodeExist, "", UndefIndex, UndefTerm)
+		return etcdErr.NewError(etcdErr.EcodeNodeExist, "", n.store.Index())
 	}
 
 	n.Children[name] = child
@@ -213,22 +179,9 @@ func (n *Node) Remove(recursive bool, callback func(path string)) *etcdErr.Error
 
 	if n.IsDir() && !recursive {
 		// cannot delete a directory without set recursive to true
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
-	}
-
-	onceBody := func() {
-		n.internalRemove(recursive, callback)
+		return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
 	}
 
-	// this function might be entered multiple times by expire and delete
-	// every node will only be deleted once.
-	n.once.Do(onceBody)
-
-	return nil
-}
-
-// internalRemove function will be called by remove()
-func (n *Node) internalRemove(recursive bool, callback func(path string)) {
 	if !n.IsDir() { // key-value pair
 		_, name := path.Split(n.Path)
 
@@ -241,9 +194,11 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
 			callback(n.Path)
 		}
 
-		// the stop channel has a buffer. just send to it!
-		n.stopExpire <- true
-		return
+		if !n.IsPermanent() {
+			n.store.ttlKeyHeap.remove(n)
+		}
+
+		return nil
 	}
 
 	for _, child := range n.Children { // delete all children
@@ -259,68 +214,21 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
 			callback(n.Path)
 		}
 
-		n.stopExpire <- true
-	}
-}
-
-// Expire function will test if the node is expired.
-// if the node is already expired, delete the node and return.
-// if the node is permanent (this shouldn't happen), return at once.
-// else wait for a period time, then remove the node. and notify the watchhub.
-func (n *Node) Expire() {
-	expired, duration := n.IsExpired()
-
-	if expired { // has been expired
-		// since the parent function of Expire() runs serially,
-		// there is no need for lock here
-		e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
-		n.store.WatcherHub.notify(e)
-
-		n.Remove(true, nil)
-		n.store.Stats.Inc(ExpireCount)
-
-		return
-	}
+		if !n.IsPermanent() {
+			n.store.ttlKeyHeap.remove(n)
+		}
 
-	if duration == 0 { // Permanent Node
-		return
 	}
 
-	go func() { // do monitoring
-		select {
-		// if timeout, delete the node
-		case <-time.After(duration):
-
-			// before expire get the lock, the expiration time
-			// of the node may be updated.
-			// we have to check again when get the lock
-			n.store.worldLock.Lock()
-			defer n.store.worldLock.Unlock()
-
-			expired, _ := n.IsExpired()
-
-			if expired {
-				e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
-				n.store.WatcherHub.notify(e)
-
-				n.Remove(true, nil)
-				n.store.Stats.Inc(ExpireCount)
-			}
-
-			return
-
-		// if stopped, return
-		case <-n.stopExpire:
-			return
-		}
-	}()
+	return nil
 }
 
 func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 	if n.IsDir() {
 		pair := KeyValuePair{
-			Key: n.Path,
-			Dir: true,
+			Key:           n.Path,
+			Dir:           true,
+			ModifiedIndex: n.ModifiedIndex,
 		}
 		pair.Expiration, pair.TTL = n.ExpirationAndTTL()
 
@@ -356,28 +264,35 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 	}
 
 	pair := KeyValuePair{
-		Key:   n.Path,
-		Value: n.Value,
+		Key:           n.Path,
+		Value:         n.Value,
+		ModifiedIndex: n.ModifiedIndex,
 	}
 	pair.Expiration, pair.TTL = n.ExpirationAndTTL()
 	return pair
 }
 
 func (n *Node) UpdateTTL(expireTime time.Time) {
-	if !n.IsPermanent() {
-		// check if the node has been expired
-		// if the node is not expired, we need to stop the go routine associated with
-		// that node.
-		expired, _ := n.IsExpired()
 
-		if !expired {
-			n.stopExpire <- true // suspend it to modify the expiration
+	if !n.IsPermanent() {
+		if expireTime.IsZero() {
+			// from ttl to permanent
+			// remove from ttl heap
+			n.store.ttlKeyHeap.remove(n)
+		} else {
+			// update ttl
+			n.ExpireTime = expireTime
+			// update ttl heap
+			n.store.ttlKeyHeap.update(n)
 		}
-	}
 
-	n.ExpireTime = expireTime
-	if expireTime.Sub(Permanent) != 0 {
-		n.Expire()
+	} else {
+		if !expireTime.IsZero() {
+			// from permanent to ttl
+			n.ExpireTime = expireTime
+			// push into ttl heap
+			n.store.ttlKeyHeap.push(n)
+		}
 	}
 }
 
@@ -386,10 +301,10 @@ func (n *Node) UpdateTTL(expireTime time.Time) {
 // If the node is a key-value pair, it will clone the pair.
 func (n *Node) Clone() *Node {
 	if !n.IsDir() {
-		return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+		return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
 	}
 
-	clone := newDir(n.store, n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+	clone := newDir(n.store, n.Path, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
 
 	for key, child := range n.Children {
 		clone.Children[key] = child.Clone()
@@ -414,7 +329,8 @@ func (n *Node) recoverAndclean() {
 		}
 	}
 
-	n.stopExpire = make(chan bool, 1)
+	if !n.ExpireTime.IsZero() {
+		n.store.ttlKeyHeap.push(n)
+	}
 
-	n.Expire()
 }

+ 27 - 20
store/stats_test.go

@@ -10,85 +10,92 @@ import (
 // Ensure that a successful Get is recorded in the stats.
 func TestStoreStatsGetSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.Get("/foo", false, false, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.Get("/foo", false, false)
 	assert.Equal(t, uint64(1), s.Stats.GetSuccess, "")
 }
 
 // Ensure that a failed Get is recorded in the stats.
 func TestStoreStatsGetFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.Get("/no_such_key", false, false, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.Get("/no_such_key", false, false)
 	assert.Equal(t, uint64(1), s.Stats.GetFail, "")
 }
 
 // Ensure that a successful Create is recorded in the stats.
 func TestStoreStatsCreateSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "")
 }
 
 // Ensure that a failed Create is recorded in the stats.
 func TestStoreStatsCreateFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 3, 1)
-	s.Create("/foo", "bar", false, Permanent, 4, 1)
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo", "bar", false, Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CreateFail, "")
 }
 
 // Ensure that a successful Update is recorded in the stats.
 func TestStoreStatsUpdateSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.Update("/foo", "baz", Permanent, 4, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.Update("/foo", "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "")
 }
 
 // Ensure that a failed Update is recorded in the stats.
 func TestStoreStatsUpdateFail(t *testing.T) {
 	s := newStore()
-	s.Update("/foo", "bar", Permanent, 4, 1)
+	s.Update("/foo", "bar", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.UpdateFail, "")
 }
 
 // Ensure that a successful CAS is recorded in the stats.
 func TestStoreStatsCompareAndSwapSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 4, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "")
 }
 
 // Ensure that a failed CAS is recorded in the stats.
 func TestStoreStatsCompareAndSwapFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent, 4, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "")
 }
 
 // Ensure that a successful Delete is recorded in the stats.
 func TestStoreStatsDeleteSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 3, 1)
-	s.Delete("/foo", false, 4, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	s.Delete("/foo", false)
 	assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "")
 }
 
 // Ensure that a failed Delete is recorded in the stats.
 func TestStoreStatsDeleteFail(t *testing.T) {
 	s := newStore()
-	s.Delete("/foo", false, 4, 1)
+	s.Delete("/foo", false)
 	assert.Equal(t, uint64(1), s.Stats.DeleteFail, "")
 }
 
-// Ensure that the number of expirations is recorded in the stats.
+//Ensure that the number of expirations is recorded in the stats.
 func TestStoreStatsExpireCount(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, time.Now().Add(5 * time.Millisecond), 3, 1)
+
+	c := make(chan bool)
+	defer func() {
+		c <- true
+	}()
+
+	go mockSyncService(s.DeleteExpiredKeys, c)
+	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
 	assert.Equal(t, uint64(0), s.Stats.ExpireCount, "")
-	time.Sleep(10 * time.Millisecond)
+	time.Sleep(600 * time.Millisecond)
 	assert.Equal(t, uint64(1), s.Stats.ExpireCount, "")
 }

+ 108 - 64
store/store.go

@@ -35,28 +35,30 @@ const defaultVersion = 2
 type Store interface {
 	Version() int
 	CommandFactory() CommandFactory
-	Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
-	Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
-	Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
-	Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
-		index uint64, term uint64) (*Event, error)
+	Index() uint64
+	Get(nodePath string, recursive, sorted bool) (*Event, error)
+	Set(nodePath string, value string, expireTime time.Time) (*Event, error)
+	Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
+	Create(nodePath string, value string, incrementalSuffix bool,
+		expireTime time.Time) (*Event, error)
 	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
-		value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
-	Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
-	Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
+		value string, expireTime time.Time) (*Event, error)
+	Delete(nodePath string, recursive bool) (*Event, error)
+	Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
 	Save() ([]byte, error)
 	Recovery(state []byte) error
 	TotalTransactions() uint64
 	JsonStats() []byte
+	DeleteExpiredKeys(cutoff time.Time)
 }
 
 type store struct {
 	Root           *Node
 	WatcherHub     *watcherHub
-	Index          uint64
-	Term           uint64
+	CurrentIndex   uint64
 	Stats          *Stats
 	CurrentVersion int
+	ttlKeyHeap     *ttlKeyHeap  // need to recovery manually
 	worldLock      sync.RWMutex // stop the world lock
 }
 
@@ -67,9 +69,10 @@ func New() Store {
 func newStore() *store {
 	s := new(store)
 	s.CurrentVersion = defaultVersion
-	s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
+	s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
 	s.Stats = newStats()
 	s.WatcherHub = newWatchHub(1000)
+	s.ttlKeyHeap = newTtlKeyHeap()
 	return s
 }
 
@@ -78,6 +81,11 @@ func (s *store) Version() int {
 	return s.CurrentVersion
 }
 
+// Retrieves current of the store
+func (s *store) Index() uint64 {
+	return s.CurrentIndex
+}
+
 // CommandFactory retrieves the command factory for the current version of the store.
 func (s *store) CommandFactory() CommandFactory {
 	return GetCommandFactory(s.Version())
@@ -86,20 +94,20 @@ func (s *store) CommandFactory() CommandFactory {
 // Get function returns a get event.
 // If recursive is true, it will return all the content under the node path.
 // If sorted is true, it will sort the content by keys.
-func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
+func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
 	nodePath = path.Clean(path.Join("/", nodePath))
 
-	n, err := s.internalGet(nodePath, index, term)
+	n, err := s.internalGet(nodePath)
 
 	if err != nil {
 		s.Stats.Inc(GetFail)
 		return nil, err
 	}
 
-	e := newEvent(Get, nodePath, index, term)
+	e := newEvent(Get, nodePath, n.ModifiedIndex)
 
 	if n.IsDir() { // node is a directory
 		e.Dir = true
@@ -141,13 +149,12 @@ func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term
 // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
-func (s *store) Create(nodePath string, value string, unique bool,
-	expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
+	e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
 
 	if err == nil {
 		s.Stats.Inc(CreateSuccess)
@@ -159,12 +166,12 @@ func (s *store) Create(nodePath string, value string, unique bool,
 }
 
 // Set function creates or replace the Node at nodePath.
-func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
+	e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
 
 	if err == nil {
 		s.Stats.Inc(SetSuccess)
@@ -176,14 +183,14 @@ func (s *store) Set(nodePath string, value string, expireTime time.Time, index u
 }
 
 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
-	value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+	value string, expireTime time.Time) (*Event, error) {
 
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 
-	n, err := s.internalGet(nodePath, index, term)
+	n, err := s.internalGet(nodePath)
 
 	if err != nil {
 		s.Stats.Inc(CompareAndSwapFail)
@@ -192,17 +199,20 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 
 	if n.IsDir() { // can only test and set file
 		s.Stats.Inc(CompareAndSwapFail)
-		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
 	}
 
 	// If both of the prevValue and prevIndex are given, we will test both of them.
 	// Command will be executed, only if both of the tests are successful.
 	if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
-		e := newEvent(CompareAndSwap, nodePath, index, term)
+		// update etcd index
+		s.CurrentIndex++
+
+		e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex)
 		e.PrevValue = n.Value
 
 		// if test succeed, write the value
-		n.Write(value, index, term)
+		n.Write(value, s.CurrentIndex)
 		n.UpdateTTL(expireTime)
 
 		e.Value = value
@@ -215,25 +225,27 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 
 	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
 	s.Stats.Inc(CompareAndSwapFail)
-	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
+	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
 }
 
 // Delete function deletes the node at the given path.
 // If the node is a directory, recursive must be true to delete it.
-func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
+func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 
-	n, err := s.internalGet(nodePath, index, term)
+	nextIndex := s.CurrentIndex + 1
+
+	n, err := s.internalGet(nodePath)
 
 	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(DeleteFail)
 		return nil, err
 	}
 
-	e := newEvent(Delete, nodePath, index, term)
+	e := newEvent(Delete, nodePath, nextIndex)
 
 	if n.IsDir() {
 		e.Dir = true
@@ -253,33 +265,37 @@ func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint6
 		return nil, err
 	}
 
+	// update etcd index
+	s.CurrentIndex++
+
 	s.WatcherHub.notify(e)
 	s.Stats.Inc(DeleteSuccess)
 
 	return e, nil
 }
 
-func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
+func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
 	prefix = path.Clean(path.Join("/", prefix))
 
+	nextIndex := s.CurrentIndex + 1
+
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
-	s.Index, s.Term = index, term
-
 	var c <-chan *Event
 	var err *etcdErr.Error
 
 	if sinceIndex == 0 {
-		c, err = s.WatcherHub.watch(prefix, recursive, index+1)
+		c, err = s.WatcherHub.watch(prefix, recursive, nextIndex)
 
 	} else {
 		c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
 	}
 
 	if err != nil {
-		err.Index = index
-		err.Term = term
+		// watchhub do not know the current Index
+		// we need to attach the currentIndex here
+		err.Index = s.CurrentIndex
 		return nil, err
 	}
 
@@ -311,52 +327,59 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string
 // Update function updates the value/ttl of the node.
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
-func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
+
+	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
+
 	nodePath = path.Clean(path.Join("/", nodePath))
 
-	n, err := s.internalGet(nodePath, index, term)
+	n, err := s.internalGet(nodePath)
 
 	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(UpdateFail)
 		return nil, err
 	}
 
-	e := newEvent(Update, nodePath, s.Index, s.Term)
+	e := newEvent(Update, nodePath, nextIndex)
 
 	if len(newValue) != 0 {
 		if n.IsDir() {
 			// if the node is a directory, we cannot update value
 			s.Stats.Inc(UpdateFail)
-			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
+			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
 		}
 
 		e.PrevValue = n.Value
-		n.Write(newValue, index, term)
+		n.Write(newValue, nextIndex)
+		e.Value = newValue
+	} else {
+		// do not update value
+		e.Value = n.Value
 	}
 
 	// update ttl
 	n.UpdateTTL(expireTime)
 
-	e.Value = newValue
-
 	e.Expiration, e.TTL = n.ExpirationAndTTL()
 
 	s.WatcherHub.notify(e)
 
 	s.Stats.Inc(UpdateSuccess)
 
+	s.CurrentIndex = nextIndex
+
 	return e, nil
 }
 
 func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
-	expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
+	expireTime time.Time, action string) (*Event, error) {
 
-	s.Index, s.Term = index, term
+	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
 
 	if unique { // append unique item under the node path
-		nodePath += "/" + strconv.FormatUint(index, 10)
+		nodePath += "/" + strconv.FormatUint(nextIndex, 10)
 	}
 
 	nodePath = path.Clean(path.Join("/", nodePath))
@@ -368,11 +391,11 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 
 	if err != nil {
 		s.Stats.Inc(SetFail)
-		err.Index, err.Term = s.Index, s.Term
+		err.Index = currIndex
 		return nil, err
 	}
 
-	e := newEvent(action, nodePath, s.Index, s.Term)
+	e := newEvent(action, nodePath, nextIndex)
 
 	n, _ := d.GetChild(newNodeName)
 
@@ -380,25 +403,25 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 	if n != nil {
 		if replace {
 			if n.IsDir() {
-				return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
+				return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
 			}
 			e.PrevValue, _ = n.Read()
 
 			n.Remove(false, nil)
 		} else {
-			return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
+			return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
 		}
 	}
 
 	if len(value) != 0 { // create file
 		e.Value = value
 
-		n = newKV(s, nodePath, value, index, term, d, "", expireTime)
+		n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
 
 	} else { // create directory
 		e.Dir = true
 
-		n = newDir(s, nodePath, index, term, d, "", expireTime)
+		n = newDir(s, nodePath, nextIndex, d, "", expireTime)
 
 	}
 
@@ -406,28 +429,26 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 	d.Add(n)
 
 	// Node with TTL
-	if expireTime.Sub(Permanent) != 0 {
-		n.Expire()
+	if !n.IsPermanent() {
+		s.ttlKeyHeap.push(n)
+
 		e.Expiration, e.TTL = n.ExpirationAndTTL()
 	}
 
+	s.CurrentIndex = nextIndex
+
 	s.WatcherHub.notify(e)
 	return e, nil
 }
 
 // InternalGet function get the node of the given nodePath.
-func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
+func (s *store) internalGet(nodePath string) (*Node, *etcdErr.Error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 
-	// update file system known index and term
-	if index > s.Index {
-		s.Index, s.Term = index, term
-	}
-
 	walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
 
 		if !parent.IsDir() {
-			err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
+			err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
 			return nil, err
 		}
 
@@ -436,7 +457,7 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 			return child, nil
 		}
 
-		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
+		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
 	}
 
 	f, err := s.walk(nodePath, walkFunc)
@@ -447,6 +468,28 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 	return f, nil
 }
 
+// deleteExpiredKyes will delete all
+func (s *store) DeleteExpiredKeys(cutoff time.Time) {
+	s.worldLock.Lock()
+	defer s.worldLock.Unlock()
+
+	for {
+		node := s.ttlKeyHeap.top()
+		if node == nil || node.ExpireTime.After(cutoff) {
+			break
+		}
+
+		s.ttlKeyHeap.pop()
+		node.Remove(true, nil)
+
+		s.CurrentIndex++
+
+		s.Stats.Inc(ExpireCount)
+		s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex))
+	}
+
+}
+
 // checkDir function will check whether the component is a directory under parent node.
 // If it is a directory, this function will return the pointer to that node.
 // If it does not exist, this function will create a new directory and return the pointer to that node.
@@ -459,10 +502,10 @@ func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
 			return node, nil
 		}
 
-		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
 	}
 
-	n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
+	n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
 
 	parent.Children[dirName] = n
 
@@ -477,8 +520,7 @@ func (s *store) Save() ([]byte, error) {
 	s.worldLock.Lock()
 
 	clonedStore := newStore()
-	clonedStore.Index = s.Index
-	clonedStore.Term = s.Term
+	clonedStore.CurrentIndex = s.CurrentIndex
 	clonedStore.Root = s.Root.Clone()
 	clonedStore.WatcherHub = s.WatcherHub.clone()
 	clonedStore.Stats = s.Stats.clone()
@@ -508,6 +550,8 @@ func (s *store) Recovery(state []byte) error {
 		return err
 	}
 
+	s.ttlKeyHeap = newTtlKeyHeap()
+
 	s.Root.recoverAndclean()
 	return nil
 }

+ 148 - 96
store/store_test.go

@@ -27,8 +27,8 @@ import (
 // Ensure that the store can retrieve an existing value.
 func TestStoreGetValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, err := s.Get("/foo", false, false, 2, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.Get("/foo", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "get", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -39,14 +39,14 @@ func TestStoreGetValue(t *testing.T) {
 // Note that hidden files should not be returned.
 func TestStoreGetDirectory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	s.Create("/foo/bar", "X", false, Permanent, 3, 1)
-	s.Create("/foo/_hidden", "*", false, Permanent, 4, 1)
-	s.Create("/foo/baz", "", false, Permanent, 5, 1)
-	s.Create("/foo/baz/bat", "Y", false, Permanent, 6, 1)
-	s.Create("/foo/baz/_hidden", "*", false, Permanent, 7, 1)
-	s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3), 8, 1)
-	e, err := s.Get("/foo", true, false, 8, 1)
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo/bar", "X", false, Permanent)
+	s.Create("/foo/_hidden", "*", false, Permanent)
+	s.Create("/foo/baz", "", false, Permanent)
+	s.Create("/foo/baz/bat", "Y", false, Permanent)
+	s.Create("/foo/baz/_hidden", "*", false, Permanent)
+	s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3))
+	e, err := s.Get("/foo", true, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "get", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -69,13 +69,13 @@ func TestStoreGetDirectory(t *testing.T) {
 // Ensure that the store can retrieve a directory in sorted order.
 func TestStoreGetSorted(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	s.Create("/foo/x", "0", false, Permanent, 3, 1)
-	s.Create("/foo/z", "0", false, Permanent, 4, 1)
-	s.Create("/foo/y", "", false, Permanent, 5, 1)
-	s.Create("/foo/y/a", "0", false, Permanent, 6, 1)
-	s.Create("/foo/y/b", "0", false, Permanent, 7, 1)
-	e, err := s.Get("/foo", true, true, 8, 1)
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo/x", "0", false, Permanent)
+	s.Create("/foo/z", "0", false, Permanent)
+	s.Create("/foo/y", "", false, Permanent)
+	s.Create("/foo/y/a", "0", false, Permanent)
+	s.Create("/foo/y/b", "0", false, Permanent)
+	e, err := s.Get("/foo", true, true)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.KVPairs[0].Key, "/foo/x", "")
 	assert.Equal(t, e.KVPairs[1].Key, "/foo/y", "")
@@ -87,7 +87,7 @@ func TestStoreGetSorted(t *testing.T) {
 // Ensure that the store can create a new key if it doesn't already exist.
 func TestStoreCreateValue(t *testing.T) {
 	s := newStore()
-	e, err := s.Create("/foo", "bar", false, Permanent, 2, 1)
+	e, err := s.Create("/foo", "bar", false, Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -97,14 +97,13 @@ func TestStoreCreateValue(t *testing.T) {
 	assert.Nil(t, e.KVPairs, "")
 	assert.Nil(t, e.Expiration, "")
 	assert.Equal(t, e.TTL, 0, "")
-	assert.Equal(t, e.Index, uint64(2), "")
-	assert.Equal(t, e.Term, uint64(1), "")
+	assert.Equal(t, e.ModifiedIndex, uint64(1), "")
 }
 
 // Ensure that the store can create a new directory if it doesn't already exist.
 func TestStoreCreateDirectory(t *testing.T) {
 	s := newStore()
-	e, err := s.Create("/foo", "", false, Permanent, 2, 1)
+	e, err := s.Create("/foo", "", false, Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -114,22 +113,21 @@ func TestStoreCreateDirectory(t *testing.T) {
 // Ensure that the store fails to create a key if it already exists.
 func TestStoreCreateFailsIfExists(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	e, _err := s.Create("/foo", "", false, Permanent, 3, 1)
+	s.Create("/foo", "", false, Permanent)
+	e, _err := s.Create("/foo", "", false, Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "")
 	assert.Equal(t, err.Message, "Already exists", "")
 	assert.Equal(t, err.Cause, "/foo", "")
-	assert.Equal(t, err.Index, uint64(3), "")
-	assert.Equal(t, err.Term, uint64(1), "")
+	assert.Equal(t, err.Index, uint64(1), "")
 	assert.Nil(t, e, 0, "")
 }
 
 // Ensure that the store can update a key if it already exists.
 func TestStoreUpdateValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, err := s.Update("/foo", "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.Update("/foo", "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -137,17 +135,16 @@ func TestStoreUpdateValue(t *testing.T) {
 	assert.Equal(t, e.PrevValue, "bar", "")
 	assert.Equal(t, e.Value, "baz", "")
 	assert.Equal(t, e.TTL, 0, "")
-	assert.Equal(t, e.Index, uint64(3), "")
-	assert.Equal(t, e.Term, uint64(1), "")
-	e, _ = s.Get("/foo", false, false, 3, 1)
+	assert.Equal(t, e.ModifiedIndex, uint64(2), "")
+	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "baz", "")
 }
 
 // Ensure that the store cannot update a directory.
 func TestStoreUpdateFailsIfDirectory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	e, _err := s.Update("/foo", "baz", Permanent, 3, 1)
+	s.Create("/foo", "", false, Permanent)
+	e, _err := s.Update("/foo", "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
 	assert.Equal(t, err.Message, "Not A File", "")
@@ -158,13 +155,20 @@ func TestStoreUpdateFailsIfDirectory(t *testing.T) {
 // Ensure that the store can update the TTL on a value.
 func TestStoreUpdateValueTTL(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	_, err := s.Update("/foo", "baz", time.Now().Add(1*time.Millisecond), 3, 1)
-	e, _ := s.Get("/foo", false, false, 3, 1)
+
+	c := make(chan bool)
+	defer func() {
+		c <- true
+	}()
+	go mockSyncService(s.DeleteExpiredKeys, c)
+
+	s.Create("/foo", "bar", false, Permanent)
+	_, err := s.Update("/foo", "baz", time.Now().Add(500*time.Millisecond))
+	e, _ := s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "baz", "")
 
-	time.Sleep(2 * time.Millisecond)
-	e, err = s.Get("/foo", false, false, 3, 1)
+	time.Sleep(600 * time.Millisecond)
+	e, err = s.Get("/foo", false, false)
 	assert.Nil(t, e, "")
 	assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
 }
@@ -172,14 +176,21 @@ func TestStoreUpdateValueTTL(t *testing.T) {
 // Ensure that the store can update the TTL on a directory.
 func TestStoreUpdateDirTTL(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	s.Create("/foo/bar", "baz", false, Permanent, 3, 1)
-	_, err := s.Update("/foo", "", time.Now().Add(1*time.Millisecond), 3, 1)
-	e, _ := s.Get("/foo/bar", false, false, 3, 1)
+
+	c := make(chan bool)
+	defer func() {
+		c <- true
+	}()
+	go mockSyncService(s.DeleteExpiredKeys, c)
+
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo/bar", "baz", false, Permanent)
+	_, err := s.Update("/foo", "", time.Now().Add(500*time.Millisecond))
+	e, _ := s.Get("/foo/bar", false, false)
 	assert.Equal(t, e.Value, "baz", "")
 
-	time.Sleep(2 * time.Millisecond)
-	e, err = s.Get("/foo/bar", false, false, 3, 1)
+	time.Sleep(600 * time.Millisecond)
+	e, err = s.Get("/foo/bar", false, false)
 	assert.Nil(t, e, "")
 	assert.Equal(t, err.(*etcdErr.Error).ErrorCode, etcdErr.EcodeKeyNotFound, "")
 }
@@ -187,8 +198,8 @@ func TestStoreUpdateDirTTL(t *testing.T) {
 // Ensure that the store can delete a value.
 func TestStoreDeleteValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, err := s.Delete("/foo", false, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.Delete("/foo", false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "delete", "")
 }
@@ -196,8 +207,8 @@ func TestStoreDeleteValue(t *testing.T) {
 // Ensure that the store can delete a directory if recursive is specified.
 func TestStoreDeleteDiretory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	e, err := s.Delete("/foo", true, 3, 1)
+	s.Create("/foo", "", false, Permanent)
+	e, err := s.Delete("/foo", true)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "delete", "")
 }
@@ -205,8 +216,8 @@ func TestStoreDeleteDiretory(t *testing.T) {
 // Ensure that the store cannot delete a directory if recursive is not specified.
 func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	e, _err := s.Delete("/foo", false, 3, 1)
+	s.Create("/foo", "", false, Permanent)
+	e, _err := s.Delete("/foo", false)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
 	assert.Equal(t, err.Message, "Not A File", "")
@@ -216,60 +227,60 @@ func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
 // Ensure that the store can conditionally update a key if it has a previous value.
 func TestStoreCompareAndSwapPrevValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.PrevValue, "bar", "")
 	assert.Equal(t, e.Value, "baz", "")
-	e, _ = s.Get("/foo", false, false, 3, 1)
+	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "baz", "")
 }
 
 // Ensure that the store cannot conditionally update a key if it has the wrong previous value.
 func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
 	assert.Equal(t, err.Message, "Test Failed", "")
 	assert.Nil(t, e, "")
-	e, _ = s.Get("/foo", false, false, 3, 1)
+	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "bar", "")
 }
 
 // Ensure that the store can conditionally update a key if it has a previous index.
 func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, err := s.CompareAndSwap("/foo", "", 2, "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.PrevValue, "bar", "")
 	assert.Equal(t, e.Value, "baz", "")
-	e, _ = s.Get("/foo", false, false, 3, 1)
+	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "baz", "")
 }
 
 // Ensure that the store cannot conditionally update a key if it has the wrong previous index.
 func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
 	assert.Equal(t, err.Message, "Test Failed", "")
 	assert.Nil(t, e, "")
-	e, _ = s.Get("/foo", false, false, 3, 1)
+	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Value, "bar", "")
 }
 
 // Ensure that the store can watch for key creation.
 func TestStoreWatchCreate(t *testing.T) {
 	s := newStore()
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
+	c, _ := s.Watch("/foo", false, 0)
+	s.Create("/foo", "bar", false, Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -280,8 +291,8 @@ func TestStoreWatchCreate(t *testing.T) {
 // Ensure that the store can watch for recursive key creation.
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 	s := newStore()
-	c, _ := s.Watch("/foo", true, 0, 0, 1)
-	s.Create("/foo/bar", "baz", false, Permanent, 2, 1)
+	c, _ := s.Watch("/foo", true, 0)
+	s.Create("/foo/bar", "baz", false, Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Key, "/foo/bar", "")
@@ -290,9 +301,9 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
 // Ensure that the store can watch for key updates.
 func TestStoreWatchUpdate(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
-	s.Update("/foo", "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	c, _ := s.Watch("/foo", false, 0)
+	s.Update("/foo", "baz", Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -301,9 +312,9 @@ func TestStoreWatchUpdate(t *testing.T) {
 // Ensure that the store can watch for recursive key updates.
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", true, 0, 0, 1)
-	s.Update("/foo/bar", "baz", Permanent, 3, 1)
+	s.Create("/foo/bar", "baz", false, Permanent)
+	c, _ := s.Watch("/foo", true, 0)
+	s.Update("/foo/bar", "baz", Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "update", "")
 	assert.Equal(t, e.Key, "/foo/bar", "")
@@ -312,9 +323,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
 // Ensure that the store can watch for key deletions.
 func TestStoreWatchDelete(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
-	s.Delete("/foo", false, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	c, _ := s.Watch("/foo", false, 0)
+	s.Delete("/foo", false)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -323,9 +334,9 @@ func TestStoreWatchDelete(t *testing.T) {
 // Ensure that the store can watch for recursive key deletions.
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", true, 0, 0, 1)
-	s.Delete("/foo/bar", false, 3, 1)
+	s.Create("/foo/bar", "baz", false, Permanent)
+	c, _ := s.Watch("/foo", true, 0)
+	s.Delete("/foo/bar", false)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Key, "/foo/bar", "")
@@ -334,9 +345,9 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
 // Ensure that the store can watch for CAS updates.
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
-	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent, 3, 1)
+	s.Create("/foo", "bar", false, Permanent)
+	c, _ := s.Watch("/foo", false, 0)
+	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Key, "/foo", "")
@@ -345,9 +356,9 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
 // Ensure that the store can watch for recursive CAS updates.
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent, 2, 1)
-	c, _ := s.Watch("/foo", true, 0, 0, 1)
-	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent, 3, 1)
+	s.Create("/foo/bar", "baz", false, Permanent)
+	c, _ := s.Watch("/foo", true, 0)
+	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "compareAndSwap", "")
 	assert.Equal(t, e.Key, "/foo/bar", "")
@@ -356,32 +367,45 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 // Ensure that the store can watch for key expiration.
 func TestStoreWatchExpire(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, time.Now().Add(1*time.Millisecond), 2, 1)
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
+
+	stopChan := make(chan bool)
+	defer func() {
+		stopChan <- true
+	}()
+	go mockSyncService(s.DeleteExpiredKeys, stopChan)
+
+	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
+	s.Create("/foofoo", "barbarbar", false, time.Now().Add(500*time.Millisecond))
+
+	c, _ := s.Watch("/", true, 0)
 	e := nbselect(c)
 	assert.Nil(t, e, "")
-	time.Sleep(2 * time.Millisecond)
+	time.Sleep(600 * time.Millisecond)
 	e = nbselect(c)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Key, "/foo", "")
+	c, _ = s.Watch("/", true, 4)
+	e = nbselect(c)
+	assert.Equal(t, e.Action, "expire", "")
+	assert.Equal(t, e.Key, "/foofoo", "")
 }
 
 // Ensure that the store can recover from a previously saved state.
 func TestStoreRecover(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	s.Create("/foo/x", "bar", false, Permanent, 3, 1)
-	s.Create("/foo/y", "baz", false, Permanent, 4, 1)
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo/x", "bar", false, Permanent)
+	s.Create("/foo/y", "baz", false, Permanent)
 	b, err := s.Save()
 
 	s2 := newStore()
 	s2.Recovery(b)
 
-	e, err := s.Get("/foo/x", false, false, 4, 1)
+	e, err := s.Get("/foo/x", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Value, "bar", "")
 
-	e, err = s.Get("/foo/y", false, false, 4, 1)
+	e, err = s.Get("/foo/y", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Value, "baz", "")
 }
@@ -389,21 +413,37 @@ func TestStoreRecover(t *testing.T) {
 // Ensure that the store can recover from a previously saved state that includes an expiring key.
 func TestStoreRecoverWithExpiration(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent, 2, 1)
-	s.Create("/foo/x", "bar", false, Permanent, 3, 1)
-	s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond), 4, 1)
+
+	c := make(chan bool)
+	defer func() {
+		c <- true
+	}()
+	go mockSyncService(s.DeleteExpiredKeys, c)
+
+	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo/x", "bar", false, Permanent)
+	s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond))
 	b, err := s.Save()
 
 	time.Sleep(10 * time.Millisecond)
 
 	s2 := newStore()
+
+	c2 := make(chan bool)
+	defer func() {
+		c2 <- true
+	}()
+	go mockSyncService(s2.DeleteExpiredKeys, c2)
+
 	s2.Recovery(b)
 
-	e, err := s.Get("/foo/x", false, false, 4, 1)
+	time.Sleep(600 * time.Millisecond)
+
+	e, err := s.Get("/foo/x", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Value, "bar", "")
 
-	e, err = s.Get("/foo/y", false, false, 4, 1)
+	e, err = s.Get("/foo/y", false, false)
 	assert.NotNil(t, err, "")
 	assert.Nil(t, e, "")
 }
@@ -417,3 +457,15 @@ func nbselect(c <-chan *Event) *Event {
 		return nil
 	}
 }
+
+func mockSyncService(f func(now time.Time), c chan bool) {
+	ticker := time.Tick(time.Millisecond * 500)
+	for {
+		select {
+		case <-c:
+			return
+		case now := <-ticker:
+			f(now)
+		}
+	}
+}

+ 81 - 0
store/ttl_key_heap.go

@@ -0,0 +1,81 @@
+package store
+
+import (
+	"container/heap"
+)
+
+// An TTLKeyHeap is a min-heap of TTLKeys order by expiration time
+type ttlKeyHeap struct {
+	array  []*Node
+	keyMap map[*Node]int
+}
+
+func newTtlKeyHeap() *ttlKeyHeap {
+	h := &ttlKeyHeap{keyMap: make(map[*Node]int)}
+	heap.Init(h)
+	return h
+}
+
+func (h ttlKeyHeap) Len() int {
+	return len(h.array)
+}
+
+func (h ttlKeyHeap) Less(i, j int) bool {
+	return h.array[i].ExpireTime.Before(h.array[j].ExpireTime)
+}
+
+func (h ttlKeyHeap) Swap(i, j int) {
+	// swap node
+	h.array[i], h.array[j] = h.array[j], h.array[i]
+
+	// update map
+	h.keyMap[h.array[i]] = i
+	h.keyMap[h.array[j]] = j
+}
+
+func (h *ttlKeyHeap) Push(x interface{}) {
+	n, _ := x.(*Node)
+	h.keyMap[n] = len(h.array)
+	h.array = append(h.array, n)
+}
+
+func (h *ttlKeyHeap) Pop() interface{} {
+	old := h.array
+	n := len(old)
+	x := old[n-1]
+	h.array = old[0 : n-1]
+	delete(h.keyMap, x)
+	return x
+}
+
+func (h *ttlKeyHeap) top() *Node {
+	if h.Len() != 0 {
+		return h.array[0]
+	}
+	return nil
+}
+
+func (h *ttlKeyHeap) pop() *Node {
+	x := heap.Pop(h)
+	n, _ := x.(*Node)
+	return n
+}
+
+func (h *ttlKeyHeap) push(x interface{}) {
+	heap.Push(h, x)
+}
+
+func (h *ttlKeyHeap) update(n *Node) {
+	index, ok := h.keyMap[n]
+	if ok {
+		heap.Remove(h, index)
+		heap.Push(h, n)
+	}
+}
+
+func (h *ttlKeyHeap) remove(n *Node) {
+	index, ok := h.keyMap[n]
+	if ok {
+		heap.Remove(h, index)
+	}
+}

+ 15 - 9
store/v2/command_factory.go

@@ -2,7 +2,7 @@ package v2
 
 import (
 	"time"
-	
+
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 )
@@ -28,8 +28,8 @@ func (f *CommandFactory) CreateUpgradeCommand() raft.Command {
 // CreateSetCommand creates a version 2 command to set a key to a given value in the store.
 func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
 	return &SetCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
 	}
 }
@@ -37,18 +37,18 @@ func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime t
 // CreateCreateCommand creates a version 2 command to create a new key in the store.
 func (f *CommandFactory) CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command {
 	return &CreateCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
-		Unique: unique,
+		Unique:     unique,
 	}
 }
 
 // CreateUpdateCommand creates a version 2 command to update a key to a given value in the store.
 func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command {
 	return &UpdateCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
 	}
 }
@@ -56,7 +56,7 @@ func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTim
 // CreateDeleteCommand creates a version 2 command to delete a key from the store.
 func (f *CommandFactory) CreateDeleteCommand(key string, recursive bool) raft.Command {
 	return &DeleteCommand{
-		Key: key,
+		Key:       key,
 		Recursive: recursive,
 	}
 }
@@ -71,3 +71,9 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
 		ExpireTime: expireTime,
 	}
 }
+
+func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
+	return &SyncCommand{
+		Time: time.Now(),
+	}
+}

+ 1 - 2
store/v2/compare_and_swap_command.go

@@ -30,8 +30,7 @@ func (c *CompareAndSwapCommand) CommandName() string {
 func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex,
-		c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 1 - 1
store/v2/create_command.go

@@ -29,7 +29,7 @@ func (c *CreateCommand) CommandName() string {
 func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 2 - 2
store/v2/delete_command.go

@@ -1,8 +1,8 @@
 package v2
 
 import (
-	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 )
 
@@ -25,7 +25,7 @@ func (c *DeleteCommand) CommandName() string {
 func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
+	e, err := s.Delete(c.Key, c.Recursive)
 
 	if err != nil {
 		log.Debug(err)

+ 1 - 1
store/v2/set_command.go

@@ -29,7 +29,7 @@ func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
 	// create a new node or replace the old node.
-	e, err := s.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := s.Set(c.Key, c.Value, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 28 - 0
store/v2/sync_command.go

@@ -0,0 +1,28 @@
+package v2
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+func init() {
+	raft.RegisterCommand(&SyncCommand{})
+}
+
+type SyncCommand struct {
+	Time time.Time `json:"time"`
+}
+
+// The name of the Sync command in the log
+func (c SyncCommand) CommandName() string {
+	return "etcd:sync"
+}
+
+func (c SyncCommand) Apply(server raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(store.Store)
+	s.DeleteExpiredKeys(c.Time)
+
+	return nil, nil
+}

+ 1 - 1
store/v2/update_command.go

@@ -27,7 +27,7 @@ func (c *UpdateCommand) CommandName() string {
 func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := s.Update(c.Key, c.Value, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 1 - 2
store/watcher.go

@@ -40,8 +40,7 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	// at the file we need to delete.
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// should get notified even if "/foo" is not the path it is watching.
-
-	if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
+	if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
 		w.eventChan <- e
 		return true
 	}

+ 10 - 15
store/watcher_hub.go

@@ -37,23 +37,24 @@ func newWatchHub(capacity int) *watcherHub {
 // If recursive is false, the first change after index at prefix will be sent to the event channel.
 // If index is zero, watch will start from the current index + 1.
 func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
-	eventChan := make(chan *Event, 1)
-
-	e, err := wh.EventHistory.scan(prefix, index)
+	event, err := wh.EventHistory.scan(prefix, index)
 
 	if err != nil {
 		return nil, err
 	}
 
-	if e != nil {
-		eventChan <- e
+	eventChan := make(chan *Event, 1) // use a buffered channel
+
+	if event != nil {
+		eventChan <- event
+
 		return eventChan, nil
 	}
 
 	w := &watcher{
 		eventChan:  eventChan,
 		recursive:  recursive,
-		sinceIndex: index - 1, // to catch Expire()
+		sinceIndex: index,
 	}
 
 	l, ok := wh.watchers[prefix]
@@ -93,19 +94,16 @@ func (wh *watcherHub) notify(e *Event) {
 
 func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 	l, ok := wh.watchers[path]
-
 	if ok {
 		curr := l.Front()
-		notifiedAll := true
 
 		for {
 			if curr == nil { // we have reached the end of the list
-				if notifiedAll {
+				if l.Len() == 0 {
 					// if we have notified all watcher in the list
 					// we can delete the list
 					delete(wh.watchers, path)
 				}
-
 				break
 			}
 
@@ -114,16 +112,13 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 			w, _ := curr.Value.(*watcher)
 
 			if w.notify(e, e.Key == path, deleted) {
+
 				// if we successfully notify a watcher
 				// we need to remove the watcher from the list
 				// and decrease the counter
-
 				l.Remove(curr)
 				atomic.AddInt64(&wh.count, -1)
-			} else {
-				// once there is a watcher in the list is not interested
-				// in the event, we should keep the list in the map
-				notifiedAll = false
+
 			}
 
 			curr = next // update current to the next

+ 3 - 3
store/watcher_test.go

@@ -35,7 +35,7 @@ func TestWatcher(t *testing.T) {
 		// do nothing
 	}
 
-	e := newEvent(Create, "/foo/bar", 1, 1)
+	e := newEvent(Create, "/foo/bar", 1)
 
 	wh.notify(e)
 
@@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) {
 
 	c, _ = wh.watch("/foo", false, 2)
 
-	e = newEvent(Create, "/foo/bar", 2, 1)
+	e = newEvent(Create, "/foo/bar", 2)
 
 	wh.notify(e)
 
@@ -58,7 +58,7 @@ func TestWatcher(t *testing.T) {
 		// do nothing
 	}
 
-	e = newEvent(Create, "/foo", 3, 1)
+	e = newEvent(Create, "/foo", 3)
 
 	wh.notify(e)
 

+ 2 - 3
tests/functional/multi_node_kill_all_and_recovery_test.go

@@ -65,8 +65,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
 		t.Fatalf("Recovery error: %s", err)
 	}
 
-	if result.Index != 18 {
-		t.Fatalf("recovery failed! [%d/18]", result.Index)
+	if result.Index != 16 {
+		t.Fatalf("recovery failed! [%d/16]", result.Index)
 	}
 }
-

+ 3 - 0
tests/functional/remove_node_test.go

@@ -1,6 +1,7 @@
 package test
 
 import (
+	"fmt"
 	"net/http"
 	"os"
 	"testing"
@@ -31,6 +32,7 @@ func TestRemoveNode(t *testing.T) {
 		for i := 0; i < 2; i++ {
 			client.Do(rmReq)
 
+			fmt.Println("send remove to node3 and wait for its exiting")
 			etcds[2].Wait()
 
 			resp, err := c.Get("_etcd/machines")
@@ -71,6 +73,7 @@ func TestRemoveNode(t *testing.T) {
 		// first kill the node, then remove it, then add it back
 		for i := 0; i < 2; i++ {
 			etcds[2].Kill()
+			fmt.Println("kill node3 and wait for its exiting")
 			etcds[2].Wait()
 
 			client.Do(rmReq)

+ 9 - 4
tests/functional/simple_snapshot_test.go

@@ -3,6 +3,7 @@ package test
 import (
 	"io/ioutil"
 	"os"
+	"strconv"
 	"testing"
 	"time"
 
@@ -52,8 +53,10 @@ func TestSimpleSnapshot(t *testing.T) {
 		t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
 	}
 
-	if snapshots[0].Name() != "0_503.ss" {
-		t.Fatal("wrong name of snapshot :[0_503.ss/", snapshots[0].Name(), "]")
+	index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
+
+	if index < 507 || index > 510 {
+		t.Fatal("wrong name of snapshot :", snapshots[0].Name())
 	}
 
 	// issue second 501 commands
@@ -82,7 +85,9 @@ func TestSimpleSnapshot(t *testing.T) {
 		t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
 	}
 
-	if snapshots[0].Name() != "0_1004.ss" {
-		t.Fatal("wrong name of snapshot :[0_1004.ss/", snapshots[0].Name(), "]")
+	index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
+
+	if index < 1015 || index > 1018 {
+		t.Fatal("wrong name of snapshot :", snapshots[0].Name())
 	}
 }