Browse Source

Merge remote-tracking branch 'xiangli-cmu/newStore' into 0.2

Brandon Philips 12 years ago
parent
commit
b8e5794765
8 changed files with 381 additions and 302 deletions
  1. 3 1
      etcd_handlers.go
  2. 8 29
      store/event.go
  3. 24 0
      store/kv_pairs.go
  4. 159 111
      store/node.go
  5. 26 46
      store/store.go
  6. 0 4
      store/store_test.go
  7. 19 111
      store/watcher.go
  8. 142 0
      store/watcher_hub.go

+ 3 - 1
etcd_handlers.go

@@ -24,12 +24,14 @@ func NewEtcdMuxer() *http.ServeMux {
 	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
 	etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
+	etcdMux.HandleFunc("/test/", TestHttpHandler)
 
+	// backward support
 	etcdMux.Handle("/v1/keys/", errorHandler(MultiplexerV1))
 	etcdMux.Handle("/v1/leader", errorHandler(LeaderHttpHandler))
 	etcdMux.Handle("/v1/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/v1/stats/", errorHandler(StatsHttpHandler))
-	etcdMux.HandleFunc("/test/", TestHttpHandler)
+
 	return etcdMux
 }
 

+ 8 - 29
store/event.go

@@ -24,40 +24,19 @@ const (
 )
 
 type Event struct {
-	Action     string         `json:"action"`
-	Key        string         `json:"key, omitempty"`
-	Dir        bool           `json:"dir,omitempty"`
-	PrevValue  string         `json:"prevValue,omitempty"`
-	Value      string         `json:"value,omitempty"`
-	KVPairs    []KeyValuePair `json:"kvs,omitempty"`
-	Expiration *time.Time     `json:"expiration,omitempty"`
-	TTL        int64          `json:"ttl,omitempty"` // Time to live in second
+	Action     string     `json:"action"`
+	Key        string     `json:"key, omitempty"`
+	Dir        bool       `json:"dir,omitempty"`
+	PrevValue  string     `json:"prevValue,omitempty"`
+	Value      string     `json:"value,omitempty"`
+	KVPairs    kvPairs    `json:"kvs,omitempty"`
+	Expiration *time.Time `json:"expiration,omitempty"`
+	TTL        int64      `json:"ttl,omitempty"` // Time to live in second
 	// The command index of the raft machine when the command is executed
 	Index uint64 `json:"index"`
 	Term  uint64 `json:"term"`
 }
 
-// When user list a directory, we add all the node into key-value pair slice
-type KeyValuePair struct {
-	Key     string         `json:"key, omitempty"`
-	Value   string         `json:"value,omitempty"`
-	Dir     bool           `json:"dir,omitempty"`
-	KVPairs []KeyValuePair `json:"kvs,omitempty"`
-}
-
-// interfaces for sorting
-func (k KeyValuePair) Len() int {
-	return len(k.KVPairs)
-}
-
-func (k KeyValuePair) Less(i, j int) bool {
-	return k.KVPairs[i].Key < k.KVPairs[j].Key
-}
-
-func (k KeyValuePair) Swap(i, j int) {
-	k.KVPairs[i], k.KVPairs[j] = k.KVPairs[j], k.KVPairs[i]
-}
-
 func newEvent(action string, key string, index uint64, term uint64) *Event {
 	return &Event{
 		Action: action,

+ 24 - 0
store/kv_pairs.go

@@ -0,0 +1,24 @@
+package store
+
+// When user list a directory, we add all the node into key-value pair slice
+type KeyValuePair struct {
+	Key     string  `json:"key, omitempty"`
+	Value   string  `json:"value,omitempty"`
+	Dir     bool    `json:"dir,omitempty"`
+	KVPairs kvPairs `json:"kvs,omitempty"`
+}
+
+type kvPairs []KeyValuePair
+
+// interfaces for sorting
+func (kvs kvPairs) Len() int {
+	return len(kvs)
+}
+
+func (kvs kvPairs) Less(i, j int) bool {
+	return kvs[i].Key < kvs[j].Key
+}
+
+func (kvs kvPairs) Swap(i, j int) {
+	kvs[i], kvs[j] = kvs[j], kvs[i]
+}

+ 159 - 111
store/node.go

@@ -3,6 +3,7 @@ package store
 import (
 	"path"
 	"sort"
+	"sync"
 	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
@@ -17,22 +18,37 @@ const (
 	removed
 )
 
+// Node is the basic element in the store system.
+// A key-value pair will have a string value
+// A directory will have a children map
 type Node struct {
-	Path          string
+	Path string
+
 	CreateIndex   uint64
 	CreateTerm    uint64
 	ModifiedIndex uint64
 	ModifiedTerm  uint64
-	Parent        *Node `json:"-"`
-	ExpireTime    time.Time
-	ACL           string
-	Value         string           // for key-value pair
-	Children      map[string]*Node // for directory
-	status        int
-	stopExpire    chan bool // stop expire routine channel
+
+	Parent *Node `json:"-"` // should not encode this field! avoid cyclical dependency.
+
+	ExpireTime time.Time
+	ACL        string
+	Value      string           // for key-value pair
+	Children   map[string]*Node // for directory
+
+	// a ttl node will have an expire routine associated with it.
+	// we need a channel to stop that routine when the expiration changes.
+	stopExpire chan bool
+
+	// ensure we only delete the node once
+	// expire and remove may try to delete a node twice
+	once sync.Once
 }
 
-func newFile(nodePath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+// newKV creates a Key-Value pair
+func newKV(nodePath string, value string, createIndex uint64,
+	createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+
 	return &Node{
 		Path:          nodePath,
 		CreateIndex:   createIndex,
@@ -47,7 +63,10 @@ func newFile(nodePath string, value string, createIndex uint64, createTerm uint6
 	}
 }
 
-func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+// newDir creates a directory
+func newDir(nodePath string, createIndex uint64, createTerm uint64,
+	parent *Node, ACL string, expireTime time.Time) *Node {
+
 	return &Node{
 		Path:        nodePath,
 		CreateIndex: createIndex,
@@ -60,56 +79,41 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node
 	}
 }
 
-// Remove function remove the node.
-// If the node is a directory and recursive is true, the function will recursively remove
-// add nodes under the receiver node.
-func (n *Node) Remove(recursive bool, callback func(path string)) *etcdErr.Error {
-	if n.status == removed { // check race between remove and expire
-		return nil
-	}
-
-	if !n.IsDir() { // file node: key-value pair
-		_, name := path.Split(n.Path)
-
-		if n.Parent != nil && n.Parent.Children[name] == n {
-			// This is the only pointer to Node object
-			// Handled by garbage collector
-			delete(n.Parent.Children, name)
-
-			if callback != nil {
-				callback(n.Path)
-			}
-
-			n.stopExpire <- true
-			n.status = removed
+// IsHidden function checks if the node is a hidden node. A hidden node
+// will begin with '_'
+// A hidden node will not be shown via get command under a directory
+// For example if we have /foo/_hidden and /foo/notHidden, get "/foo"
+// will only return /foo/notHidden
+func (n *Node) IsHidden() bool {
+	_, name := path.Split(n.Path)
 
-		}
+	return name[0] == '_'
+}
 
-		return nil
-	}
+// IsPermanent function checks if the node is a permanent one.
+func (n *Node) IsPermanent() bool {
+	return n.ExpireTime.Sub(Permanent) == 0
+}
 
-	if !recursive {
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
+// IsExpired function checks if the node has been expired.
+func (n *Node) IsExpired() (bool, time.Duration) {
+	if n.IsPermanent() {
+		return false, 0
 	}
 
-	for _, child := range n.Children { // delete all children
-		child.Remove(true, callback)
+	duration := n.ExpireTime.Sub(time.Now())
+	if duration <= 0 {
+		return true, 0
 	}
 
-	// delete self
-	_, name := path.Split(n.Path)
-	if n.Parent != nil && n.Parent.Children[name] == n {
-		delete(n.Parent.Children, name)
-
-		if callback != nil {
-			callback(n.Path)
-		}
-
-		n.stopExpire <- true
-		n.status = removed
-	}
+	return false, duration
+}
 
-	return nil
+// IsDir function checks whether the node is a directory.
+// If the node is a directory, the function will return true.
+// Otherwise the function will return false.
+func (n *Node) IsDir() bool {
+	return !(n.Children == nil)
 }
 
 // Read function gets the value of the node.
@@ -136,6 +140,13 @@ func (n *Node) Write(value string, index uint64, term uint64) *etcdErr.Error {
 	return nil
 }
 
+func (n *Node) ExpirationAndTTL() (*time.Time, int64) {
+	if n.ExpireTime.Sub(Permanent) != 0 {
+		return &n.ExpireTime, int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
+	}
+	return nil, 0
+}
+
 // List function return a slice of nodes under the receiver node.
 // If the receiver node is not a directory, a "Not A Directory" error will be returned.
 func (n *Node) List() ([]*Node, *etcdErr.Error) {
@@ -192,39 +203,64 @@ func (n *Node) Add(child *Node) *etcdErr.Error {
 	return nil
 }
 
-// Clone function clone the node recursively and return the new node.
-// If the node is a directory, it will clone all the content under this directory.
-// If the node is a key-value pair, it will clone the pair.
-func (n *Node) Clone() *Node {
-	if !n.IsDir() {
-		return newFile(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
-	}
+// Remove function remove the node.
+func (n *Node) Remove(recursive bool, callback func(path string)) *etcdErr.Error {
 
-	clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+	if n.IsDir() && !recursive {
+		// cannot delete a directory without set recursive to true
+		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
+	}
 
-	for key, child := range n.Children {
-		clone.Children[key] = child.Clone()
+	onceBody := func() {
+		n.internalRemove(recursive, callback)
 	}
 
-	return clone
+	// this function might be entered multiple times by expire and delete
+	// every node will only be deleted once.
+	n.once.Do(onceBody)
+
+	return nil
 }
 
-func (n *Node) recoverAndclean(s *Store) {
-	if n.IsDir() {
-		for _, child := range n.Children {
-			child.Parent = n
-			child.recoverAndclean(s)
+// internalRemove function will be called by remove()
+func (n *Node) internalRemove(recursive bool, callback func(path string)) {
+	if !n.IsDir() { // key-value pair
+		_, name := path.Split(n.Path)
+
+		// find its parent and remove the node from the map
+		if n.Parent != nil && n.Parent.Children[name] == n {
+			delete(n.Parent.Children, name)
+		}
+
+		if callback != nil {
+			callback(n.Path)
 		}
+
+		// the stop channel has a buffer. just send to it!
+		n.stopExpire <- true
+		return
 	}
 
-	n.stopExpire = make(chan bool, 1)
+	for _, child := range n.Children { // delete all children
+		child.Remove(true, callback)
+	}
 
-	n.Expire(s)
+	// delete self
+	_, name := path.Split(n.Path)
+	if n.Parent != nil && n.Parent.Children[name] == n {
+		delete(n.Parent.Children, name)
+
+		if callback != nil {
+			callback(n.Path)
+		}
+
+		n.stopExpire <- true
+	}
 }
 
 // Expire function will test if the node is expired.
 // if the node is already expired, delete the node and return.
-// if the node is permemant (this shouldn't happen), return at once.
+// if the node is permanent (this shouldn't happen), return at once.
 // else wait for a period time, then remove the node. and notify the watchhub.
 func (n *Node) Expire(s *Store) {
 	expired, duration := n.IsExpired()
@@ -250,14 +286,21 @@ func (n *Node) Expire(s *Store) {
 		// if timeout, delete the node
 		case <-time.After(duration):
 
+			// before expire get the lock, the expiration time
+			// of the node may be updated.
+			// we have to check again when get the lock
 			s.worldLock.Lock()
 			defer s.worldLock.Unlock()
 
-			e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
-			s.WatcherHub.notify(e)
+			expired, _ := n.IsExpired()
 
-			n.Remove(true, nil)
-			s.Stats.Inc(ExpireCount)
+			if expired {
+				e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
+				s.WatcherHub.notify(e)
+
+				n.Remove(true, nil)
+				s.Stats.Inc(ExpireCount)
+			}
 
 			return
 
@@ -268,41 +311,6 @@ func (n *Node) Expire(s *Store) {
 	}()
 }
 
-// IsHidden function checks if the node is a hidden node. A hidden node
-// will begin with '_'
-// A hidden node will not be shown via get command under a directory
-// For example if we have /foo/_hidden and /foo/notHidden, get "/foo"
-// will only return /foo/notHidden
-func (n *Node) IsHidden() bool {
-	_, name := path.Split(n.Path)
-
-	return name[0] == '_'
-}
-
-func (n *Node) IsPermanent() bool {
-	return n.ExpireTime.Sub(Permanent) == 0
-}
-
-func (n *Node) IsExpired() (bool, time.Duration) {
-	if n.IsPermanent() {
-		return false, 0
-	}
-
-	duration := n.ExpireTime.Sub(time.Now())
-	if duration <= 0 {
-		return true, 0
-	}
-
-	return false, duration
-}
-
-// IsDir function checks whether the node is a directory.
-// If the node is a directory, the function will return true.
-// Otherwise the function will return false.
-func (n *Node) IsDir() bool {
-	return !(n.Children == nil)
-}
-
 func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 	if n.IsDir() {
 		pair := KeyValuePair{
@@ -335,7 +343,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		// eliminate hidden nodes
 		pair.KVPairs = pair.KVPairs[:i]
 		if sorted {
-			sort.Sort(pair)
+			sort.Sort(pair.KVPairs)
 		}
 
 		return pair
@@ -349,6 +357,9 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 
 func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
 	if !n.IsPermanent() {
+		// check if the node has been expired
+		// if the node is not expired, we need to stop the go routine associated with
+		// that node.
 		expired, _ := n.IsExpired()
 
 		if !expired {
@@ -361,3 +372,40 @@ func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
 		n.Expire(s)
 	}
 }
+
+// Clone function clone the node recursively and return the new node.
+// If the node is a directory, it will clone all the content under this directory.
+// If the node is a key-value pair, it will clone the pair.
+func (n *Node) Clone() *Node {
+	if !n.IsDir() {
+		return newKV(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+	}
+
+	clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+
+	for key, child := range n.Children {
+		clone.Children[key] = child.Clone()
+	}
+
+	return clone
+}
+
+// recoverAndclean function help to do recovery.
+// Two things need to be done: 1. recovery structure; 2. delete expired nodes
+
+// If the node is a directory, it will help recover children's parent pointer and recursively
+// call this function on its children.
+// We check the expire last since we need to recover the whole structure first and add all the
+// notifications into the event history.
+func (n *Node) recoverAndclean(s *Store) {
+	if n.IsDir() {
+		for _, child := range n.Children {
+			child.Parent = n
+			child.recoverAndclean(s)
+		}
+	}
+
+	n.stopExpire = make(chan bool, 1)
+
+	n.Expire(s)
+}

+ 26 - 46
store/store.go

@@ -24,13 +24,16 @@ type Store struct {
 
 func New() *Store {
 	s := new(Store)
-	s.Root = newDir("/", 0, 0, nil, "", Permanent)
+	s.Root = newDir("/", UndefIndex, UndefTerm, nil, "", Permanent)
 	s.Stats = newStats()
 	s.WatcherHub = newWatchHub(1000)
 
 	return s
 }
 
+// Get function returns a get event.
+// If recursive is true, it will return all the content under the node path.
+// If sorted is true, it will sort the content by keys.
 func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
@@ -46,7 +49,7 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
 
 	e := newEvent(Get, nodePath, index, term)
 
-	if n.IsDir() { // node is dir
+	if n.IsDir() { // node is a directory
 		e.Dir = true
 
 		children, _ := n.List()
@@ -57,35 +60,26 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
 		i := 0
 
 		for _, child := range children {
-
-			if child.IsHidden() { // get will not list hidden node
+			if child.IsHidden() { // get will not return hidden nodes
 				continue
 			}
 
 			e.KVPairs[i] = child.Pair(recursive, sorted)
-
 			i++
 		}
 
 		// eliminate hidden nodes
 		e.KVPairs = e.KVPairs[:i]
 
-		rootPairs := KeyValuePair{
-			KVPairs: e.KVPairs,
-		}
-
 		if sorted {
-			sort.Sort(rootPairs)
+			sort.Sort(e.KVPairs)
 		}
 
-	} else { // node is file
-		e.Value = n.Value
+	} else { // node is a file
+		e.Value, _ = n.Read()
 	}
 
-	if n.ExpireTime.Sub(Permanent) != 0 {
-		e.Expiration = &n.ExpireTime
-		e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
-	}
+	e.Expiration, e.TTL = n.ExpirationAndTTL()
 
 	s.Stats.Inc(GetSuccess)
 
@@ -107,7 +101,7 @@ func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, fo
 // Update function updates the value/ttl of the node.
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
-func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	nodePath = path.Clean(path.Join("/", nodePath))
@@ -116,37 +110,27 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 
 	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(UpdateFail)
-
 		return nil, err
 	}
 
 	e := newEvent(Update, nodePath, s.Index, s.Term)
 
-	if n.IsDir() { // if the node is a directory, we can only update ttl
-		if len(value) != 0 {
+	if len(newValue) != 0 {
+		if n.IsDir() {
+			// if the node is a directory, we cannot update value
 			s.Stats.Inc(UpdateFail)
-
-			err := etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
-			return nil, err
+			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
 		}
 
-	} else { // if the node is a file, we can update value and ttl
 		e.PrevValue = n.Value
-
-		if len(value) != 0 {
-			e.Value = value
-		}
-
-		n.Write(value, index, term)
+		n.Write(newValue, index, term)
 	}
 
 	// update ttl
 	n.UpdateTTL(expireTime, s)
 
-	if n.ExpireTime.Sub(Permanent) != 0 {
-		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
-	}
+	e.Expiration, e.TTL = n.ExpirationAndTTL()
+
 	s.WatcherHub.notify(e)
 
 	s.Stats.Inc(UpdateSuccess)
@@ -179,18 +163,15 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 	}
 
 	if n.Value == prevValue || n.ModifiedIndex == prevIndex {
-		// if test succeed, write the value
 		e := newEvent(TestAndSet, nodePath, index, term)
 		e.PrevValue = n.Value
-		e.Value = value
-		n.Write(value, index, term)
 
+		// if test succeed, write the value
+		n.Write(value, index, term)
 		n.UpdateTTL(expireTime, s)
 
-		if n.ExpireTime.Sub(Permanent) != 0 {
-			e.Expiration = &n.ExpireTime
-			e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
-		}
+		e.Value = value
+		e.Expiration, e.TTL = n.ExpirationAndTTL()
 
 		s.WatcherHub.notify(e)
 		s.Stats.Inc(TestAndSetSuccess)
@@ -226,7 +207,8 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6
 	}
 
 	callback := func(path string) { // notify function
-		s.WatcherHub.notifyWithPath(e, path, true)
+		// notify the watchers with delted set true
+		s.WatcherHub.notifyWatchers(e, path, true)
 	}
 
 	err = n.Remove(recursive, callback)
@@ -319,7 +301,6 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 
 	if err != nil {
 		s.Stats.Inc(SetFail)
-		fmt.Println("1: bad create")
 		return nil, err
 	}
 
@@ -343,7 +324,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 	if len(value) != 0 { // create file
 		e.Value = value
 
-		n = newFile(nodePath, value, s.Index, s.Term, d, "", expireTime)
+		n = newKV(nodePath, value, s.Index, s.Term, d, "", expireTime)
 
 	} else { // create directory
 		e.Dir = true
@@ -362,8 +343,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 	// Node with TTL
 	if expireTime.Sub(Permanent) != 0 {
 		n.Expire(s)
-		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
+		e.Expiration, e.TTL = n.ExpirationAndTTL()
 	}
 
 	s.WatcherHub.notify(e)

+ 0 - 4
store/store_test.go

@@ -124,10 +124,6 @@ func TestUpdateFile(t *testing.T) {
 		t.Fatalf("cannot get sub dir before expiration [%s]", err.Error())
 	}
 
-	/*if e.KVPairs[2].Key != "/foo/foo/foo2/boo" || e.KVPairs[2].Value != "boo1" {
-		t.Fatalf("cannot get sub node of sub dir before expiration [%s]", err.Error())
-	}*/
-
 	// wait for expiration
 	time.Sleep(time.Second * 3)
 	e, err = s.Get("/foo/foo", true, false, 7, 1)

+ 19 - 111
store/watcher.go

@@ -1,125 +1,33 @@
 package store
 
-import (
-	"container/list"
-	"path"
-	"strings"
-	"sync/atomic"
-
-	etcdErr "github.com/coreos/etcd/error"
-)
-
-type watcherHub struct {
-	watchers     map[string]*list.List
-	count        int64 // current number of watchers.
-	EventHistory *EventHistory
-}
-
 type watcher struct {
 	eventChan  chan *Event
 	recursive  bool
 	sinceIndex uint64
 }
 
-func newWatchHub(capacity int) *watcherHub {
-	return &watcherHub{
-		watchers:     make(map[string]*list.List),
-		EventHistory: newEventHistory(capacity),
-	}
-}
-
-// watch function returns an Event channel.
-// If recursive is true, the first change after index under prefix will be sent to the event channel.
-// If recursive is false, the first change after index at prefix will be sent to the event channel.
-// If index is zero, watch will start from the current index + 1.
-func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
-	eventChan := make(chan *Event, 1)
-
-	e, err := wh.EventHistory.scan(prefix, index)
-
-	if err != nil {
-		return nil, err
-	}
-
-	if e != nil {
-		eventChan <- e
-		return eventChan, nil
-	}
-
-	w := &watcher{
-		eventChan:  eventChan,
-		recursive:  recursive,
-		sinceIndex: index - 1, // to catch Expire()
-	}
-
-	l, ok := wh.watchers[prefix]
-
-	if ok { // add the new watcher to the back of the list
-		l.PushBack(w)
-
-	} else { // create a new list and add the new watcher
-		l := list.New()
-		l.PushBack(w)
-		wh.watchers[prefix] = l
-	}
-
-	atomic.AddInt64(&wh.count, 1)
-
-	return eventChan, nil
-}
-
-func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
-	l, ok := wh.watchers[path]
-
-	if ok {
-		curr := l.Front()
-		notifiedAll := true
-
-		for {
-			if curr == nil { // we have reached the end of the list
-				if notifiedAll {
-					// if we have notified all watcher in the list
-					// we can delete the list
-					delete(wh.watchers, path)
-				}
-
-				break
-			}
-
-			next := curr.Next() // save the next
+// notify function notifies the watcher. If the watcher interests in the given path,
+// the function will return true.
+func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
+	// watcher is interested the path in three cases and under one condition
+	// the condition is that the event happens after the watcher's sinceIndex
 
-			w, _ := curr.Value.(*watcher)
-			if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
-				w.eventChan <- e
-				l.Remove(curr)
-				atomic.AddInt64(&wh.count, -1)
-			} else {
-				notifiedAll = false
-			}
+	// 1. the path at which the event happens is the path the watcher is watching at.
+	// For example if the watcher is watching at "/foo" and the event happens at "/foo",
+	// the watcher must be interested in that event.
 
-			curr = next // go to the next one
-		}
-	}
-}
-
-func (wh *watcherHub) notify(e *Event) {
-	e = wh.EventHistory.addEvent(e)
-
-	segments := strings.Split(e.Key, "/")
-
-	currPath := "/"
-
-	// walk through all the paths
-	for _, segment := range segments {
-		currPath = path.Join(currPath, segment)
-		wh.notifyWithPath(e, currPath, false)
-	}
-}
+	// 2. the watcher is a recursive watcher, it interests in the event happens after
+	// its watching path. For example if watcher A watches at "/foo" and it is a recursive
+	// one, it will interest in the event happens at "/foo/bar".
 
-func (wh *watcherHub) clone() *watcherHub {
-	clonedHistory := wh.EventHistory.clone()
+	// 3. when we delete a directory, we need to force notify all the watchers who watches
+	// at the file we need to delete.
+	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
+	// should get notified even if "/foo" is not the path it is watching.
 
-	return &watcherHub{
-		EventHistory: clonedHistory,
+	if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
+		w.eventChan <- e
+		return true
 	}
+	return false
 }

+ 142 - 0
store/watcher_hub.go

@@ -0,0 +1,142 @@
+package store
+
+import (
+	"container/list"
+	"path"
+	"strings"
+	"sync/atomic"
+
+	etcdErr "github.com/coreos/etcd/error"
+)
+
+// A watcherHub contains all subscribed watchers
+// watchers is a map with watched path as key and watcher as value
+// EventHistory keeps the old events for watcherHub. It is used to help
+// watcher to get a continuous event history. Or a watcher might miss the
+// event happens between the end of the first watch command and the start
+// of the second command.
+type watcherHub struct {
+	watchers     map[string]*list.List
+	count        int64 // current number of watchers.
+	EventHistory *EventHistory
+}
+
+// newWatchHub creates a watchHub. The capacity determines how many events we will
+// keep in the eventHistory.
+// Typically, we only need to keep a small size of history[smaller than 20K].
+// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
+func newWatchHub(capacity int) *watcherHub {
+	return &watcherHub{
+		watchers:     make(map[string]*list.List),
+		EventHistory: newEventHistory(capacity),
+	}
+}
+
+// watch function returns an Event channel.
+// If recursive is true, the first change after index under prefix will be sent to the event channel.
+// If recursive is false, the first change after index at prefix will be sent to the event channel.
+// If index is zero, watch will start from the current index + 1.
+func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
+	eventChan := make(chan *Event, 1)
+
+	e, err := wh.EventHistory.scan(prefix, index)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if e != nil {
+		eventChan <- e
+		return eventChan, nil
+	}
+
+	w := &watcher{
+		eventChan:  eventChan,
+		recursive:  recursive,
+		sinceIndex: index - 1, // to catch Expire()
+	}
+
+	l, ok := wh.watchers[prefix]
+
+	if ok { // add the new watcher to the back of the list
+		l.PushBack(w)
+
+	} else { // create a new list and add the new watcher
+		l := list.New()
+		l.PushBack(w)
+		wh.watchers[prefix] = l
+	}
+
+	atomic.AddInt64(&wh.count, 1)
+
+	return eventChan, nil
+}
+
+// notify function accepts an event and notify to the watchers.
+func (wh *watcherHub) notify(e *Event) {
+	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
+
+	segments := strings.Split(e.Key, "/")
+
+	currPath := "/"
+
+	// walk through all the segments of the path and notify the watchers
+	// if the path is "/foo/bar", it will notify watchers with path "/",
+	// "/foo" and "/foo/bar"
+
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+		// notify the watchers who interests in the changes of current path
+		wh.notifyWatchers(e, currPath, false)
+	}
+}
+
+func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
+	l, ok := wh.watchers[path]
+
+	if ok {
+		curr := l.Front()
+		notifiedAll := true
+
+		for {
+			if curr == nil { // we have reached the end of the list
+				if notifiedAll {
+					// if we have notified all watcher in the list
+					// we can delete the list
+					delete(wh.watchers, path)
+				}
+
+				break
+			}
+
+			next := curr.Next() // save reference to the next one in the list
+
+			w, _ := curr.Value.(*watcher)
+
+			if w.notify(e, e.Key == path, deleted) {
+				// if we successfully notify a watcher
+				// we need to remove the watcher from the list
+				// and decrease the counter
+
+				l.Remove(curr)
+				atomic.AddInt64(&wh.count, -1)
+			} else {
+				// once there is a watcher in the list is not interested
+				// in the event, we should keep the list in the map
+				notifiedAll = false
+			}
+
+			curr = next // update current to the next
+		}
+	}
+}
+
+// clone function clones the watcherHub and return the cloned one.
+// only clone the static content. do not clone the current watchers.
+func (wh *watcherHub) clone() *watcherHub {
+	clonedHistory := wh.EventHistory.clone()
+
+	return &watcherHub{
+		EventHistory: clonedHistory,
+	}
+}