Browse Source

Merge branch 'fileSystem' of github.com:xiangli-cmu/etcd into fileSystem

Hongchao Deng 12 years ago
parent
commit
8ab6684bf5

+ 11 - 4
Documentation/etcd-file-system.md

@@ -30,13 +30,20 @@ Besides the file and directory difference, all nodes have common attributes and
     - If the node is a directory, the child nodes of the directory will be returned.
     - If recursive is true, it will recursively get the nodes of the directory.
 
-- **Set** (path, value[optional], ttl [optional])
+- **Create** (path, value[optional], ttl [optional])
 
-  Set the value to a file. Set operation will help to create intermediate directories with no expiration time.
-    - If the value is given, set will create a file
-    - If the value is not given, set will crate a directory
+  Create a file. Create operation will help to create intermediate directories with no expiration time.
+    - If the file already exists, create will fail.
+    - If the value is given, set will create a file.
+    - If the value is not given, set will crate a directory.
     - If ttl is given, the node will be deleted when it expires.
 
+- **Update** (path, value[optional], ttl [optional])
+
+  Update the content of the node.
+    - If the value is given, the value of the key will be updated.
+    - If ttl is given, the expiration time of the node will be updated.
+
 - **Delete** (path, recursive)
 
   Delete the node of given path.

+ 2 - 1
file_system/event.go

@@ -8,7 +8,8 @@ import (
 
 const (
 	Get        = "get"
-	Set        = "set"
+	Create     = "create"
+	Update     = "update"
 	Delete     = "delete"
 	TestAndSet = "testAndSet"
 )

+ 6 - 6
file_system/event_test.go

@@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) {
 
 	// Add
 	for i := 0; i < 200; i++ {
-		e := newEvent(Set, "/foo", uint64(i), 0)
+		e := newEvent(Create, "/foo", uint64(i), 0)
 		eh.addEvent(e)
 	}
 
@@ -33,11 +33,11 @@ func TestScanHistory(t *testing.T) {
 	eh := newEventHistory(100)
 
 	// Add
-	eh.addEvent(newEvent(Set, "/foo", 1, 0))
-	eh.addEvent(newEvent(Set, "/foo/bar", 2, 0))
-	eh.addEvent(newEvent(Set, "/foo/foo", 3, 0))
-	eh.addEvent(newEvent(Set, "/foo/bar/bar", 4, 0))
-	eh.addEvent(newEvent(Set, "/foo/foo/foo", 5, 0))
+	eh.addEvent(newEvent(Create, "/foo", 1, 0))
+	eh.addEvent(newEvent(Create, "/foo/bar", 2, 0))
+	eh.addEvent(newEvent(Create, "/foo/foo", 3, 0))
+	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0))
+	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0))
 
 	e, err := eh.scan("/foo", 1)
 	if err != nil || e.Index != 1 {

+ 111 - 64
file_system/file_system.go

@@ -19,23 +19,23 @@ type FileSystem struct {
 
 func New() *FileSystem {
 	return &FileSystem{
-		Root:       newDir("/", 0, 0, nil, ""),
+		Root:       newDir("/", 0, 0, nil, "", Permanent),
 		WatcherHub: newWatchHub(1000),
 	}
 
 }
 
-func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint64) (*Event, error) {
-	// TODO: add recursive get
-	n, err := fs.InternalGet(keyPath, index, term)
+func (fs *FileSystem) Get(nodePath string, recusive bool, index uint64, term uint64) (*Event, error) {
+	n, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil {
 		return nil, err
 	}
 
-	e := newEvent(Get, keyPath, index, term)
+	e := newEvent(Get, nodePath, index, term)
 
 	if n.IsDir() { // node is dir
+		e.Dir = true
 
 		children, _ := n.List()
 		e.KVPairs = make([]KeyValuePair, len(children))
@@ -57,7 +57,6 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint
 
 		// eliminate hidden nodes
 		e.KVPairs = e.KVPairs[:i]
-
 	} else { // node is file
 		e.Value = n.Value
 	}
@@ -65,93 +64,133 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint
 	return e, nil
 }
 
-func (fs *FileSystem) Set(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
-	keyPath = path.Clean("/" + keyPath)
+// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
+// If the node has already existed, create will fail.
+// If any node on the path is a file, create will fail.
+func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+	nodePath = path.Clean("/" + nodePath)
 
-	// update file system known index and term
-	fs.Index, fs.Term = index, term
+	// make sure we can create the node
+	_, err := fs.InternalGet(nodePath, index, term)
 
-	dir, name := path.Split(keyPath)
+	if err == nil { // key already exists
+		return nil, etcdErr.NewError(105, nodePath)
+	}
 
-	// walk through the keyPath and get the last directory node
-	d, err := fs.walk(dir, fs.checkDir)
+	etcdError, _ := err.(etcdErr.Error)
 
-	if err != nil {
+	if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
 		return nil, err
 	}
 
-	e := newEvent(Set, keyPath, fs.Index, fs.Term)
-	e.Value = value
+	dir, _ := path.Split(nodePath)
 
-	f, err := d.GetFile(name)
+	// walk through the nodePath, create dirs and get the last directory node
+	d, err := fs.walk(dir, fs.checkDir)
 
-	if err == nil {
+	if err != nil {
+		return nil, err
+	}
 
-		if f != nil { // update previous file if exist
-			e.PrevValue = f.Value
-			f.Write(e.Value, index, term)
+	e := newEvent(Create, nodePath, fs.Index, fs.Term)
 
-			// if the previous ExpireTime is not Permanent and expireTime is given
-			// we stop the previous expire routine
-			if f.ExpireTime != Permanent && expireTime != Permanent {
-				f.stopExpire <- true
-			}
-		} else { // create new file
+	var n *Node
 
-			f = newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime)
+	if len(value) != 0 { // create file
+		e.Value = value
 
-			err = d.Add(f)
+		n = newFile(nodePath, value, fs.Index, fs.Term, d, "", expireTime)
 
-		}
+	} else { // create directory
+		e.Dir = true
+
+		n = newDir(nodePath, fs.Index, fs.Term, d, "", expireTime)
 
 	}
 
+	err = d.Add(n)
+
 	if err != nil {
 		return nil, err
 	}
 
 	// Node with TTL
 	if expireTime != Permanent {
-		go f.Expire()
-		e.Expiration = &f.ExpireTime
+		go n.Expire()
+		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
 	}
 
+	fs.WatcherHub.notify(e)
 	return e, nil
 }
 
-func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
-	f, err := fs.InternalGet(keyPath, index, term)
+// Update function updates the value/ttl of the node.
+// If the node is a file, the value and the ttl can be updated.
+// If the node is a directory, only the ttl can be updated.
+func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+	n, err := fs.InternalGet(nodePath, index, term)
 
-	if err != nil {
+	if err != nil { // if the node does not exist, return error
+		return nil, err
+	}
 
-		etcdError, _ := err.(etcdErr.Error)
-		if etcdError.ErrorCode == 100 { // file does not exist
+	e := newEvent(Update, nodePath, fs.Index, fs.Term)
 
-			if prevValue == "" && prevIndex == 0 { // test against if prevValue is empty
-				fs.Set(keyPath, value, expireTime, index, term)
-				e := newEvent(TestAndSet, keyPath, index, term)
-				e.Value = value
-				return e, nil
-			}
+	if n.IsDir() { // if the node is a directory, we can only update ttl
 
-			return nil, err
+		if len(value) != 0 {
+			return nil, etcdErr.NewError(102, nodePath)
+		}
+
+	} else { // if the node is a file, we can update value and ttl
+		e.PrevValue = n.Value
 
+		if len(value) != 0 {
+			e.Value = value
 		}
 
+		n.Write(value, index, term)
+	}
+
+	// update ttl
+	if n.ExpireTime != Permanent && expireTime != Permanent {
+		n.stopExpire <- true
+	}
+
+	if expireTime != Permanent {
+		go n.Expire()
+		e.Expiration = &n.ExpireTime
+		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
+	}
+
+	fs.WatcherHub.notify(e)
+	return e, nil
+}
+
+func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
+	value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+
+	f, err := fs.InternalGet(nodePath, index, term)
+
+	if err != nil {
+
 		return nil, err
 	}
 
 	if f.IsDir() { // can only test and set file
-		return nil, etcdErr.NewError(102, keyPath)
+		return nil, etcdErr.NewError(102, nodePath)
 	}
 
 	if f.Value == prevValue || f.ModifiedIndex == prevIndex {
 		// if test succeed, write the value
-		e := newEvent(TestAndSet, keyPath, index, term)
+		e := newEvent(TestAndSet, nodePath, index, term)
 		e.PrevValue = f.Value
 		e.Value = value
 		f.Write(value, index, term)
+
+		fs.WatcherHub.notify(e)
+
 		return e, nil
 	}
 
@@ -159,20 +198,16 @@ func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uin
 	return nil, etcdErr.NewError(101, cause)
 }
 
-func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term uint64) (*Event, error) {
-	n, err := fs.InternalGet(keyPath, index, term)
+// Delete function deletes the node at the given path.
+// If the node is a directory, recursive must be true to delete it.
+func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
+	n, err := fs.InternalGet(nodePath, index, term)
 
-	if err != nil {
+	if err != nil { // if the node does not exist, return error
 		return nil, err
 	}
 
-	err = n.Remove(recurisive)
-
-	if err != nil {
-		return nil, err
-	}
-
-	e := newEvent(Delete, keyPath, index, term)
+	e := newEvent(Delete, nodePath, index, term)
 
 	if n.IsDir() {
 		e.Dir = true
@@ -180,12 +215,24 @@ func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term
 		e.PrevValue = n.Value
 	}
 
+	callback := func(path string) { // notify function
+		fs.WatcherHub.notifyWithPath(e, path, true)
+	}
+
+	err = n.Remove(recursive, callback)
+
+	if err != nil {
+		return nil, err
+	}
+
+	fs.WatcherHub.notify(e)
+
 	return e, nil
 }
 
-// walk function walks all the keyPath and apply the walkFunc on each directory
-func (fs *FileSystem) walk(keyPath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
-	components := strings.Split(keyPath, "/")
+// walk function walks all the nodePath and apply the walkFunc on each directory
+func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
+	components := strings.Split(nodePath, "/")
 
 	curr := fs.Root
 
@@ -205,9 +252,9 @@ func (fs *FileSystem) walk(keyPath string, walkFunc func(prev *Node, component s
 	return curr, nil
 }
 
-// InternalGet function get the node of the given keyPath.
-func (fs *FileSystem) InternalGet(keyPath string, index uint64, term uint64) (*Node, error) {
-	keyPath = path.Clean("/" + keyPath)
+// InternalGet function get the node of the given nodePath.
+func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
+	nodePath = path.Clean("/" + nodePath)
 
 	// update file system known index and term
 	fs.Index, fs.Term = index, term
@@ -226,7 +273,7 @@ func (fs *FileSystem) InternalGet(keyPath string, index uint64, term uint64) (*N
 		return nil, etcdErr.NewError(100, path.Join(parent.Path, name))
 	}
 
-	f, err := fs.walk(keyPath, walkFunc)
+	f, err := fs.walk(nodePath, walkFunc)
 
 	if err != nil {
 		return nil, err
@@ -247,7 +294,7 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
 		return subDir, nil
 	}
 
-	n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL)
+	n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL, Permanent)
 
 	parent.Children[dirName] = n
 

+ 79 - 28
file_system/file_system_test.go

@@ -5,26 +5,63 @@ import (
 	"time"
 )
 
-func TestSetAndGet(t *testing.T) {
+func TestCreateAndGet(t *testing.T) {
 	fs := New()
-	setAndGet(fs, "/foobar", t)
-	setAndGet(fs, "/foo/bar", t)
-	setAndGet(fs, "/foo/foo/bar", t)
+
+	// this should create successfully
+	createAndGet(fs, "/foobar", t)
+	createAndGet(fs, "/foo/bar", t)
+	createAndGet(fs, "/foo/foo/bar", t)
+
+	// already exist, create should fail
+	_, err := fs.Create("/foobar", "bar", Permanent, 1, 1)
+
+	if err == nil {
+		t.Fatal("Create should fail")
+	}
+
+	// meet file, create should fail
+	_, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1)
+
+	if err == nil {
+		t.Fatal("Create should fail")
+	}
+
+	// create a directory
+	_, err = fs.Create("/fooDir", "", Permanent, 3, 1)
+
+	if err != nil {
+		t.Fatal("Cannot create /fooDir")
+	}
+
+	e, err := fs.Get("/fooDir", false, 3, 1)
+
+	if err != nil || e.Dir != true {
+		t.Fatal("Cannot create /fooDir ")
+	}
+
+	// create a file under directory
+	_, err = fs.Create("/fooDir/bar", "bar", Permanent, 4, 1)
+
+	if err != nil {
+		t.Fatal("Cannot create /fooDir/bar = bar")
+	}
+
 }
 
 func TestUpdateFile(t *testing.T) {
 	fs := New()
 
-	_, err := fs.Set("/foo/bar", "bar", Permanent, 1, 1)
+	_, err := fs.Create("/foo/bar", "bar", Permanent, 1, 1)
 
 	if err != nil {
-		t.Fatalf("cannot set %s=bar [%s]", "/foo/bar", err.Error())
+		t.Fatalf("cannot update %s=bar [%s]", "/foo/bar", err.Error())
 	}
 
-	_, err = fs.Set("/foo/bar", "barbar", Permanent, 2, 1)
+	_, err = fs.Update("/foo/bar", "barbar", Permanent, 2, 1)
 
 	if err != nil {
-		t.Fatalf("cannot set %s=barbar [%s]", "/foo/bar", err.Error())
+		t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error())
 	}
 
 	e, err := fs.Get("/foo/bar", false, 2, 1)
@@ -43,11 +80,11 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo
 	// set key-value /foo/foo=bar
-	fs.Set("/foo/foo", "bar", Permanent, 1, 1)
+	fs.Create("/foo/foo", "bar", Permanent, 1, 1)
 
 	// create dir /foo/fooDir
 	// set key-value /foo/fooDir/foo=bar
-	fs.Set("/foo/fooDir/foo", "bar", Permanent, 2, 1)
+	fs.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1)
 
 	e, err := fs.Get("/foo", true, 2, 1)
 
@@ -74,7 +111,7 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo/_hidden
 	// set key-value /foo/_hidden/foo -> bar
-	fs.Set("/foo/_hidden/foo", "bar", Permanent, 3, 1)
+	fs.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1)
 
 	e, _ = fs.Get("/foo", false, 2, 1)
 
@@ -86,7 +123,7 @@ func TestListDirectory(t *testing.T) {
 func TestRemove(t *testing.T) {
 	fs := New()
 
-	fs.Set("/foo", "bar", Permanent, 1, 1)
+	fs.Create("/foo", "bar", Permanent, 1, 1)
 	_, err := fs.Delete("/foo", false, 1, 1)
 
 	if err != nil {
@@ -99,9 +136,9 @@ func TestRemove(t *testing.T) {
 		t.Fatalf("can get the node after deletion")
 	}
 
-	fs.Set("/foo/bar", "bar", Permanent, 1, 1)
-	fs.Set("/foo/car", "car", Permanent, 1, 1)
-	fs.Set("/foo/dar/dar", "dar", Permanent, 1, 1)
+	fs.Create("/foo/bar", "bar", Permanent, 1, 1)
+	fs.Create("/foo/car", "car", Permanent, 1, 1)
+	fs.Create("/foo/dar/dar", "dar", Permanent, 1, 1)
 
 	_, err = fs.Delete("/foo", false, 1, 1)
 
@@ -128,7 +165,7 @@ func TestExpire(t *testing.T) {
 
 	expire := time.Now().Add(time.Second)
 
-	fs.Set("/foo", "bar", expire, 1, 1)
+	fs.Create("/foo", "bar", expire, 1, 1)
 
 	_, err := fs.InternalGet("/foo", 1, 1)
 
@@ -144,7 +181,7 @@ func TestExpire(t *testing.T) {
 		t.Fatalf("can get the node after expiration time")
 	}
 
-	fs.Set("/foo", "bar", expire, 1, 1)
+	fs.Create("/foo", "bar", expire, 1, 1)
 
 	time.Sleep(time.Millisecond * 50)
 	_, err = fs.InternalGet("/foo", 1, 1)
@@ -155,14 +192,14 @@ func TestExpire(t *testing.T) {
 
 	expire = time.Now().Add(time.Second)
 
-	fs.Set("/foo", "bar", expire, 1, 1)
+	fs.Create("/foo", "bar", expire, 1, 1)
 	fs.Delete("/foo", false, 1, 1)
 
 }
 
 func TestTestAndSet(t *testing.T) {
 	fs := New()
-	fs.Set("/foo", "bar", Permanent, 1, 1)
+	fs.Create("/foo", "bar", Permanent, 1, 1)
 
 	// test on wrong previous value
 	_, err := fs.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
@@ -191,23 +228,36 @@ func TestTestAndSet(t *testing.T) {
 	if e.PrevValue != "car" || e.Value != "bar" {
 		t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar")
 	}
+}
 
-	// test on empty previous value
-	e, err = fs.TestAndSet("/fooDir/foo", "", 0, "bar", Permanent, 4, 1)
-	if err != nil {
-		t.Fatal("test on empty node should be succeeded")
+func TestWatchRemove(t *testing.T) {
+	fs := New()
+	fs.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
+
+	// watch at a deeper path
+	c, _ := fs.WatcherHub.watch("/foo/foo/foo", false, 0)
+	fs.Delete("/foo", true, 2, 1)
+	e := <-c
+	if e.Key != "/foo" {
+		t.Fatal("watch for delete fails")
 	}
 
-	if e.Key != "/fooDir/foo" || e.PrevValue != "" || e.Value != "bar" {
-		t.Fatalf("[%v/%v] [%v/%v] [%v/%v]", e.Key, "/fooDir/foo", e.PrevValue, "", e.Value, "bar")
+	fs.Create("/foo/foo/foo", "bar", Permanent, 3, 1)
+	// watch at a prefix
+	c, _ = fs.WatcherHub.watch("/foo", true, 0)
+	fs.Delete("/foo/foo/foo", false, 4, 1)
+	e = <-c
+	if e.Key != "/foo/foo/foo" {
+		t.Fatal("watch for delete fails")
 	}
+
 }
 
-func setAndGet(fs *FileSystem, path string, t *testing.T) {
-	_, err := fs.Set(path, "bar", Permanent, 1, 1)
+func createAndGet(fs *FileSystem, path string, t *testing.T) {
+	_, err := fs.Create(path, "bar", Permanent, 1, 1)
 
 	if err != nil {
-		t.Fatalf("cannot set %s=bar [%s]", path, err.Error())
+		t.Fatalf("cannot create %s=bar [%s]", path, err.Error())
 	}
 
 	e, err := fs.Get(path, false, 1, 1)
@@ -219,4 +269,5 @@ func setAndGet(fs *FileSystem, path string, t *testing.T) {
 	if e.Value != "bar" {
 		t.Fatalf("expect value of %s is bar [%s]", path, e.Value)
 	}
+
 }

+ 20 - 9
file_system/node.go

@@ -34,9 +34,9 @@ type Node struct {
 	stopExpire    chan bool // stop expire routine channel
 }
 
-func newFile(keyPath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+func newFile(nodePath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
 	return &Node{
-		Path:          keyPath,
+		Path:          nodePath,
 		CreateIndex:   createIndex,
 		CreateTerm:    createTerm,
 		ModifiedIndex: createIndex,
@@ -49,14 +49,15 @@ func newFile(keyPath string, value string, createIndex uint64, createTerm uint64
 	}
 }
 
-func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, ACL string) *Node {
+func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
 	return &Node{
-		Path:        keyPath,
+		Path:        nodePath,
 		CreateIndex: createIndex,
 		CreateTerm:  createTerm,
 		Parent:      parent,
 		ACL:         ACL,
 		stopExpire:  make(chan bool, 1),
+		ExpireTime:  expireTime,
 		Children:    make(map[string]*Node),
 	}
 }
@@ -64,7 +65,7 @@ func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node,
 // Remove function remove the node.
 // If the node is a directory and recursive is true, the function will recursively remove
 // add nodes under the receiver node.
-func (n *Node) Remove(recursive bool) error {
+func (n *Node) Remove(recursive bool, callback func(path string)) error {
 	n.mu.Lock()
 	defer n.mu.Unlock()
 
@@ -79,6 +80,11 @@ func (n *Node) Remove(recursive bool) error {
 			// This is the only pointer to Node object
 			// Handled by garbage collector
 			delete(n.Parent.Children, name)
+
+			if callback != nil {
+				callback(n.Path)
+			}
+
 			n.stopExpire <- true
 			n.status = removed
 		}
@@ -91,13 +97,18 @@ func (n *Node) Remove(recursive bool) error {
 	}
 
 	for _, child := range n.Children { // delete all children
-		child.Remove(true)
+		child.Remove(true, callback)
 	}
 
 	// delete self
 	_, name := path.Split(n.Path)
 	if n.Parent.Children[name] == n {
 		delete(n.Parent.Children, name)
+
+		if callback != nil {
+			callback(n.Path)
+		}
+
 		n.stopExpire <- true
 		n.status = removed
 	}
@@ -210,7 +221,7 @@ func (n *Node) Clone() *Node {
 		return newFile(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
 	}
 
-	clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL)
+	clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
 
 	for key, child := range n.Children {
 		clone.Children[key] = child.Clone()
@@ -234,14 +245,14 @@ func (n *Node) IsDir() bool {
 func (n *Node) Expire() {
 	duration := n.ExpireTime.Sub(time.Now())
 	if duration <= 0 {
-		n.Remove(true)
+		n.Remove(true, nil)
 		return
 	}
 
 	select {
 	// if timeout, delete the node
 	case <-time.After(duration):
-		n.Remove(true)
+		n.Remove(true, nil)
 		return
 
 	// if stopped, return

+ 61 - 24
file_system/watcher.go

@@ -12,6 +12,11 @@ type watcherHub struct {
 	EventHistory *EventHistory
 }
 
+type watcher struct {
+	eventChan chan *Event
+	recursive bool
+}
+
 func newWatchHub(capacity int) *watcherHub {
 	return &watcherHub{
 		watchers:     make(map[string]*list.List),
@@ -19,57 +24,89 @@ func newWatchHub(capacity int) *watcherHub {
 	}
 }
 
-func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) {
+// watch function returns an Event channel.
+// If recursive is true, the first change after index under prefix will be sent to the event channel.
+// If recursive is false, the first change after index at prefix will be sent to the event channel.
+// If index is zero, watch will start from the current index + 1.
+func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, error) {
 	eventChan := make(chan *Event, 1)
 
 	e, err := wh.EventHistory.scan(prefix, index)
 
 	if err != nil {
-		return err, nil
+		return nil, err
 	}
 
 	if e != nil {
 		eventChan <- e
-		return nil, eventChan
+		return eventChan, nil
+	}
+
+	w := &watcher{
+		eventChan: eventChan,
+		recursive: recursive,
 	}
 
 	l, ok := wh.watchers[prefix]
 
-	if ok {
-		l.PushBack(eventChan)
-	} else {
+	if ok { // add the new watcher to the back of the list
+		l.PushBack(w)
+
+	} else { // create a new list and add the new watcher
 		l := list.New()
-		l.PushBack(eventChan)
+		l.PushBack(w)
 		wh.watchers[prefix] = l
 	}
 
-	return nil, eventChan
+	return eventChan, nil
 }
 
-func (wh *watcherHub) notify(e *Event) {
+func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
+	l, ok := wh.watchers[path]
 
-	segments := strings.Split(e.Key, "/")
-	currPath := "/"
+	if ok {
 
-	// walk through all the paths
-	for _, segment := range segments {
-		currPath = path.Join(currPath, segment)
+		curr := l.Front()
+		notifiedAll := true
 
-		l, ok := wh.watchers[currPath]
+		for {
 
-		if ok {
+			if curr == nil { // we have reached the end of the list
 
-			for {
-				element := l.Front()
-				if element == nil {
-					delete(wh.watchers, currPath)
-					break
+				if notifiedAll {
+					// if we have notified all watcher in the list
+					// we can delete the list
+					delete(wh.watchers, path)
 				}
-				c, _ := element.Value.(chan *Event)
-				c <- e
-				l.Remove(element)
+				break
+			}
+
+			next := curr.Next() // save the next
+
+			w, _ := curr.Value.(*watcher)
+
+			if w.recursive || force || e.Key == path {
+				w.eventChan <- e
+				l.Remove(curr)
+			} else {
+				notifiedAll = false
 			}
+
+			curr = next // go to the next one
+
 		}
+	}
+}
+
+func (wh *watcherHub) notify(e *Event) {
 
+	segments := strings.Split(e.Key, "/")
+
+	currPath := "/"
+
+	// walk through all the paths
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+		wh.notifyWithPath(e, currPath, false)
 	}
 }

+ 26 - 2
file_system/watcher_test.go

@@ -6,7 +6,7 @@ import (
 
 func TestWatch(t *testing.T) {
 	wh := newWatchHub(100)
-	err, c := wh.watch("/foo", 0)
+	c, err := wh.watch("/foo", true, 0)
 
 	if err != nil {
 		t.Fatal("%v", err)
@@ -19,7 +19,7 @@ func TestWatch(t *testing.T) {
 		// do nothing
 	}
 
-	e := newEvent(Set, "/foo/bar", 1, 0)
+	e := newEvent(Create, "/foo/bar", 1, 0)
 
 	wh.notify(e)
 
@@ -28,4 +28,28 @@ func TestWatch(t *testing.T) {
 	if e != re {
 		t.Fatal("recv != send")
 	}
+
+	c, _ = wh.watch("/foo", false, 0)
+
+	e = newEvent(Create, "/foo/bar", 1, 0)
+
+	wh.notify(e)
+
+	select {
+	case <-c:
+		t.Fatal("should not receive from channel if not recursive")
+	default:
+		// do nothing
+	}
+
+	e = newEvent(Create, "/foo", 1, 0)
+
+	wh.notify(e)
+
+	re = <-c
+
+	if e != re {
+		t.Fatal("recv != send")
+	}
+
 }