Browse Source

init filesystem

Xiang Li 12 years ago
parent
commit
329f8c4fa3

+ 16 - 6
README.md

@@ -1,4 +1,5 @@
 # etcd
+README version 0.1.0
 
 [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd)
 
@@ -272,7 +273,7 @@ Next, lets configure etcd to use this keypair:
 You can now test the configuration using https:
 
 ```sh
-curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar
+curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v
 ```
 
 You should be able to see the handshake succeed.
@@ -302,7 +303,7 @@ We can also do authentication using CA certs. The clients will provide their cer
 Try the same request to this server:
 
 ```sh
-curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar
+curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v
 ```
 
 The request should be rejected by the server.
@@ -347,6 +348,9 @@ We use -s to specify server port and -c to specify client port and -d to specify
 ./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d nodes/node1 -n node1
 ```
 
+**Note:** If you want to run etcd on external IP address and still have access locally you need to add `-cl 0.0.0.0` so that it will listen on both external and localhost addresses.
+A similar argument `-sl` is used to setup the listening address for the server port.
+
 Let the join two more nodes to this cluster using the -C argument:
 
 ```sh
@@ -363,7 +367,7 @@ curl -L http://127.0.0.1:4001/v1/machines
 We should see there are three nodes in the cluster
 
 ```
-0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003
+http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003
 ```
 
 The machine list is also available via this API:
@@ -373,7 +377,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
 ```
 
 ```json
-[{"action":"GET","key":"/machines/node1","value":"0.0.0.0,7001,4001","index":4},{"action":"GET","key":"/machines/node3","value":"0.0.0.0,7002,4002","index":4},{"action":"GET","key":"/machines/node4","value":"0.0.0.0,7003,4003","index":4}]
+[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003","index":4}]
 ```
 
 The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```.
@@ -386,7 +390,7 @@ curl -L http://127.0.0.1:4001/v1/leader
 The first server we set up should be the leader, if it has not dead during these commands.
 
 ```
-0.0.0.0:7001
+http://127.0.0.1:7001
 ```
 
 Now we can do normal SET and GET operations on keys as we explored earlier.
@@ -414,7 +418,13 @@ curl -L http://127.0.0.1:4001/v1/leader
 ```
 
 ```
-0.0.0.0:7002 or 0.0.0.0:7003
+http://127.0.0.1:7002
+```
+
+or
+
+```
+http://127.0.0.1:7003
 ```
 
 You should be able to see this:

+ 2 - 0
error/error.go

@@ -17,6 +17,8 @@ func init() {
 	errors[101] = "The given PrevValue is not equal to the value of the key"
 	errors[102] = "Not A File"
 	errors[103] = "Reached the max number of machines in the cluster"
+	errors[104] = "Not A Directory"
+	errors[105] = "Already exists"
 
 	// Post form related errors
 	errors[200] = "Value is Required in POST form"

+ 2 - 2
etcd.go

@@ -52,8 +52,8 @@ func init() {
 	flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)")
 	flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the advertised public hostname:port for etcd client communication")
 	flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the advertised public hostname:port for raft server communication")
-	flag.StringVar(&argInfo.EtcdListenHost, "cl", "127.0.0.1", "the listening hostname for etcd client communication")
-	flag.StringVar(&argInfo.RaftListenHost, "sl", "127.0.0.1", "the listening hostname for raft server communication")
+	flag.StringVar(&argInfo.EtcdListenHost, "cl", "", "the listening hostname for etcd client communication (defaults to advertised ip)")
+	flag.StringVar(&argInfo.RaftListenHost, "sl", "", "the listening hostname for raft server communication (defaults to advertised ip)")
 	flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
 
 	flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")

+ 12 - 12
etcd_handlers.go

@@ -176,18 +176,18 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er
 //--------------------------------------
 
 // Handler to return the current leader's raft address
-func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	leader := r.Leader()
-
-	if leader != "" {
-		w.WriteHeader(http.StatusOK)
-		raftURL, _ := nameToRaftURL(leader)
-		w.Write([]byte(raftURL))
-		return nil
-	} else {
-		return etcdErr.NewError(301, "")
-	}
-}
+// func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
+// 	leader := r.Leader()
+
+// 	if leader != "" {
+// 		w.WriteHeader(http.StatusOK)
+// 		raftURL, _ := nameToRaftURL(leader)
+// 		w.Write([]byte(raftURL))
+// 		return nil
+// 	} else {
+// 		return etcdErr.NewError(301, "")
+// 	}
+// }
 
 // Handler to return all the known machines in the current cluster
 func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {

+ 123 - 0
file_system/event.go

@@ -0,0 +1,123 @@
+package fileSystem
+
+import (
+	"strings"
+	"sync"
+	"time"
+)
+
+const (
+	Set         = "set"
+	Delete      = "delete"
+	TestAndSet  = "testAndSet"
+	TestIAndSet = "testiAndSet"
+)
+
+type Event struct {
+	Action     string     `json:"action"`
+	Key        string     `json:"key"`
+	Dir        bool       `json:"dir,omitempty"`
+	PrevValue  string     `json:"prevValue,omitempty"`
+	Value      string     `json:"value,omitempty"`
+	Expiration *time.Time `json:"expiration,omitempty"`
+	TTL        int64      `json:"ttl,omitempty"` // Time to live in second
+	// The command index of the raft machine when the command is executed
+	Index uint64 `json:"index"`
+	Term  uint64 `json:"term"`
+}
+
+func newEvent(action string, key string, index uint64, term uint64) *Event {
+	return &Event{
+		Action: action,
+		Key:    key,
+		Index:  index,
+		Term:   term,
+	}
+}
+
+type eventQueue struct {
+	events   []*Event
+	size     int
+	front    int
+	back     int
+	capacity int
+}
+
+func (eq *eventQueue) insert(e *Event) bool {
+
+	eq.back = (eq.back + 1) % eq.capacity
+	eq.events[eq.back] = e
+
+	if eq.size == eq.capacity { //dequeue
+		eq.front = (eq.back + 1) % eq.capacity
+		return true
+	} else {
+		eq.size++
+		return false
+	}
+
+}
+
+type EventHistory struct {
+	Queue      eventQueue
+	StartIndex uint64
+	rwl        sync.RWMutex
+}
+
+func newEventHistory(capacity int) *EventHistory {
+	return &EventHistory{
+		Queue: eventQueue{
+			capacity: capacity,
+			events:   make([]*Event, capacity),
+			back:     -1,
+		},
+	}
+}
+
+// addEvent function adds event into the eventHistory
+func (eh *EventHistory) addEvent(e *Event) {
+	eh.rwl.Lock()
+	defer eh.rwl.Unlock()
+
+	if eh.Queue.insert(e) {
+		eh.StartIndex++
+	} else {
+		eh.StartIndex = eh.Queue.events[eh.Queue.front].Index
+	}
+}
+
+func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
+	eh.rwl.RLock()
+	defer eh.rwl.RUnlock()
+
+	start := index - eh.StartIndex
+
+	if start < 0 {
+
+		// TODO: Add error type
+		return nil, nil
+	}
+
+	if start >= uint64(eh.Queue.size) {
+
+		return nil, nil
+	}
+
+	i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity))
+
+	for {
+
+		e := eh.Queue.events[i]
+		if strings.HasPrefix(e.Key, prefix) {
+			return e, nil
+		}
+
+		i = (i + 1) % eh.Queue.capacity
+
+		if i == eh.Queue.back {
+			// TODO: Add error type
+			return nil, nil
+		}
+	}
+
+}

+ 65 - 0
file_system/event_test.go

@@ -0,0 +1,65 @@
+package fileSystem
+
+import (
+	"testing"
+)
+
+// TestEventQueue tests a queue with capacity = 100
+// Add 200 events into that queue, and test if the
+// previous 100 events have been swapped out.
+func TestEventQueue(t *testing.T) {
+
+	eh := newEventHistory(100)
+
+	// Add
+	for i := 0; i < 200; i++ {
+		e := newEvent(Set, "/foo", uint64(i), 0)
+		eh.addEvent(e)
+	}
+
+	// Test
+	j := 100
+	for i := eh.Queue.front; i != eh.Queue.back; i = (i + 1) % eh.Queue.capacity {
+		e := eh.Queue.events[i]
+		if e.Index != uint64(j) {
+			t.Fatalf("queue error!")
+		}
+		j++
+	}
+
+}
+
+func TestScanHistory(t *testing.T) {
+	eh := newEventHistory(100)
+
+	// Add
+	eh.addEvent(newEvent(Set, "/foo", 1, 0))
+	eh.addEvent(newEvent(Set, "/foo/bar", 2, 0))
+	eh.addEvent(newEvent(Set, "/foo/foo", 3, 0))
+	eh.addEvent(newEvent(Set, "/foo/bar/bar", 4, 0))
+	eh.addEvent(newEvent(Set, "/foo/foo/foo", 5, 0))
+
+	e, err := eh.scan("/foo", 1)
+	if err != nil || e.Index != 1 {
+		t.Fatalf("scan error [/foo] [1] %v", e.Index)
+	}
+
+	e, err = eh.scan("/foo/bar", 1)
+
+	if err != nil || e.Index != 2 {
+		t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
+	}
+
+	e, err = eh.scan("/foo/bar", 3)
+
+	if err != nil || e.Index != 4 {
+		t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
+	}
+
+	e, err = eh.scan("/foo/bar", 6)
+
+	if e != nil {
+		t.Fatalf("bad index shoud reuturn nil")
+	}
+
+}

+ 139 - 0
file_system/file_system.go

@@ -0,0 +1,139 @@
+package fileSystem
+
+import (
+	"fmt"
+	"path/filepath"
+	"strings"
+	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
+)
+
+type FileSystem struct {
+	Root         *Node
+	EventHistory *EventHistory
+	WatcherHub   *watcherHub
+	Index        uint64
+	Term         uint64
+}
+
+func New() *FileSystem {
+	return &FileSystem{
+		Root:       newDir("/", 0, 0, nil, ""),
+		WatcherHub: newWatchHub(1000),
+	}
+
+}
+
+func (fs *FileSystem) InternalGet(path string, index uint64, term uint64) (*Node, error) {
+	fmt.Println("GET: ", path)
+	path = filepath.Clean("/" + path)
+
+	// update file system known index and term
+	fs.Index, fs.Term = index, term
+
+	walkFunc := func(parent *Node, dirName string) (*Node, error) {
+		child, ok := parent.Children[dirName]
+		if ok {
+			return child, nil
+		}
+
+		return nil, etcdErr.NewError(100, "get")
+	}
+
+	f, err := fs.walk(path, walkFunc)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return f, nil
+}
+
+func (fs *FileSystem) Set(path string, value string, expireTime time.Time, index uint64, term uint64) error {
+	path = filepath.Clean("/" + path)
+
+	// update file system known index and term
+	fs.Index, fs.Term = index, term
+
+	dir, name := filepath.Split(path)
+
+	// walk through the path and get the last directory node
+	d, err := fs.walk(dir, fs.checkDir)
+
+	if err != nil {
+		return err
+	}
+
+	f := newFile(name, value, fs.Index, fs.Term, d, "", expireTime)
+
+	err = d.Add(f)
+
+	if err == nil {
+		if expireTime != Permanent {
+			go f.Expire()
+		}
+	}
+
+	return err
+
+}
+
+func (fs *FileSystem) TestAndSet() {
+
+}
+
+func (fs *FileSystem) TestIndexAndSet() {
+
+}
+
+func (fs *FileSystem) Delete(path string, recurisive bool, index uint64, term uint64) error {
+	n, err := fs.InternalGet(path, index, term)
+
+	if err != nil {
+		return err
+	}
+
+	return n.Remove(recurisive)
+}
+
+// walk function walks all the path and apply the walkFunc on each directory
+func (fs *FileSystem) walk(path string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
+	components := strings.Split(path, "/")
+
+	curr := fs.Root
+
+	var err error
+	for i := 1; i < len(components); i++ {
+		if len(components[i]) == 0 { // ignore empty string
+			return curr, nil
+		}
+
+		curr, err = walkFunc(curr, components[i])
+		if err != nil {
+			return nil, err
+		}
+
+	}
+
+	return curr, nil
+}
+
+// checkDir function will check whether the component is a directory under parent node.
+// If it is a directory, this function will return the pointer to that node.
+// If it does not exist, this function will create a new directory and return the pointer to that node.
+// If it is a file, this function will return error.
+func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
+
+	subDir, ok := parent.Children[dirName]
+
+	if ok {
+		return subDir, nil
+	}
+
+	n := newDir(filepath.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL)
+
+	parent.Children[dirName] = n
+
+	return n, nil
+}

+ 114 - 0
file_system/file_system_test.go

@@ -0,0 +1,114 @@
+package fileSystem
+
+import (
+	"testing"
+	"time"
+)
+
+func TestSetAndGet(t *testing.T) {
+	fs := New()
+	setAndGet(fs, "/foobar", t)
+	setAndGet(fs, "/foo/bar", t)
+	setAndGet(fs, "/foo/foo/bar", t)
+}
+
+func TestRemove(t *testing.T) {
+	fs := New()
+
+	fs.Set("/foo", "bar", Permanent, 1, 1)
+	err := fs.Delete("/foo", false, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot delete %s [%s]", "/foo", err.Error())
+	}
+
+	_, err = fs.InternalGet("/foo", 1, 1)
+
+	if err == nil || err.Error() != "Key Not Found" {
+		t.Fatalf("can get the node after deletion")
+	}
+
+	fs.Set("/foo/bar", "bar", Permanent, 1, 1)
+	fs.Set("/foo/car", "car", Permanent, 1, 1)
+	fs.Set("/foo/dar/dar", "dar", Permanent, 1, 1)
+
+	err = fs.Delete("/foo", false, 1, 1)
+
+	if err == nil {
+		t.Fatalf("should not be able to delete a directory without recursive")
+	}
+
+	err = fs.Delete("/foo", true, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot delete %s [%s]", "/foo", err.Error())
+	}
+
+	_, err = fs.InternalGet("/foo", 1, 1)
+
+	if err == nil || err.Error() != "Key Not Found" {
+		t.Fatalf("can get the node after deletion ")
+	}
+
+}
+
+func TestExpire(t *testing.T) {
+	fs := New()
+
+	expire := time.Now().Add(time.Second)
+
+	fs.Set("/foo", "bar", expire, 1, 1)
+
+	_, err := fs.InternalGet("/foo", 1, 1)
+
+	if err != nil {
+		t.Fatalf("can not get the node")
+	}
+
+	time.Sleep(time.Second * 2)
+
+	_, err = fs.InternalGet("/foo", 1, 1)
+
+	if err == nil {
+		t.Fatalf("can get the node after expiration time")
+	}
+
+	fs.Set("/foo", "bar", expire, 1, 1)
+
+	time.Sleep(time.Millisecond * 50)
+	_, err = fs.InternalGet("/foo", 1, 1)
+
+	if err == nil {
+		t.Fatalf("can get the node after expiration time")
+	}
+
+	expire = time.Now().Add(time.Second)
+
+	fs.Set("/foo", "bar", expire, 1, 1)
+	fs.Delete("/foo", false, 1, 1)
+
+}
+
+func setAndGet(fs *FileSystem, path string, t *testing.T) {
+	err := fs.Set(path, "bar", Permanent, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot set %s=bar [%s]", path, err.Error())
+	}
+
+	n, err := fs.InternalGet(path, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot get %s [%s]", path, err.Error())
+	}
+
+	value, err := n.Read()
+
+	if err != nil {
+		t.Fatalf("cannot read %s [%s]", path, err.Error())
+	}
+
+	if value != "bar" {
+		t.Fatalf("expect value of %s is bar [%s]", path, value)
+	}
+}

+ 225 - 0
file_system/node.go

@@ -0,0 +1,225 @@
+package fileSystem
+
+import (
+	"fmt"
+	"path/filepath"
+	"sync"
+	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
+)
+
+var (
+	Permanent time.Time
+)
+
+const (
+	normal = iota
+	removed
+)
+
+type Node struct {
+	Path        string
+	CreateIndex uint64
+	CreateTerm  uint64
+	Parent      *Node
+	ExpireTime  time.Time
+	ACL         string
+	Value       string           // for key-value pair
+	Children    map[string]*Node // for directory
+	status      int
+	mu          sync.Mutex
+	removeChan  chan bool // remove channel
+}
+
+func newFile(path string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
+	return &Node{
+		Path:        path,
+		CreateIndex: createIndex,
+		CreateTerm:  createTerm,
+		Parent:      parent,
+		ACL:         ACL,
+		removeChan:  make(chan bool, 1),
+		ExpireTime:  expireTime,
+		Value:       value,
+	}
+}
+
+func newDir(path string, createIndex uint64, createTerm uint64, parent *Node, ACL string) *Node {
+	return &Node{
+		Path:        path,
+		CreateIndex: createIndex,
+		CreateTerm:  createTerm,
+		Parent:      parent,
+		ACL:         ACL,
+		removeChan:  make(chan bool, 1),
+		Children:    make(map[string]*Node),
+	}
+}
+
+// Remove function remove the node.
+// If the node is a directory and recursive is true, the function will recursively remove
+// add nodes under the receiver node.
+func (n *Node) Remove(recursive bool) error {
+	n.mu.Lock()
+	defer n.mu.Unlock()
+
+	if n.status == removed {
+		return nil
+	}
+
+	if !n.IsDir() { // key-value pair
+		_, name := filepath.Split(n.Path)
+
+		if n.Parent.Children[name] == n {
+			delete(n.Parent.Children, name)
+			n.removeChan <- true
+			n.status = removed
+		}
+
+		return nil
+	}
+
+	if !recursive {
+		return etcdErr.NewError(102, "")
+	}
+
+	for _, n := range n.Children { // delete all children
+		n.Remove(true)
+	}
+
+	// delete self
+	_, name := filepath.Split(n.Path)
+	if n.Parent.Children[name] == n {
+		delete(n.Parent.Children, name)
+		n.removeChan <- true
+		n.status = removed
+	}
+
+	return nil
+}
+
+// Get function gets the value of the node.
+// If the receiver node is not a key-value pair, a "Not A File" error will be returned.
+func (n *Node) Read() (string, error) {
+	if n.IsDir() {
+		return "", etcdErr.NewError(102, "")
+	}
+
+	return n.Value, nil
+}
+
+// Set function set the value of the node to the given value.
+// If the receiver node is a directory, a "Not A File" error will be returned.
+func (n *Node) Write(value string) error {
+	if n.IsDir() {
+		return etcdErr.NewError(102, "")
+	}
+
+	n.Value = value
+
+	return nil
+}
+
+// List function return a slice of nodes under the receiver node.
+// If the receiver node is not a directory, a "Not A Directory" error will be returned.
+func (n *Node) List() ([]*Node, error) {
+	n.mu.Lock()
+	n.mu.Unlock()
+	if !n.IsDir() {
+		return nil, etcdErr.NewError(104, "")
+	}
+
+	nodes := make([]*Node, len(n.Children))
+
+	i := 0
+	for _, node := range n.Children {
+		nodes[i] = node
+		i++
+	}
+
+	return nodes, nil
+}
+
+// Add function adds a node to the receiver node.
+// If the receiver is not a directory, a "Not A Directory" error will be returned.
+// If there is a existing node with the same name under the directory, a "Already Exist"
+// error will be returned
+func (n *Node) Add(child *Node) error {
+	n.mu.Lock()
+	n.mu.Unlock()
+	if n.status == removed {
+		return etcdErr.NewError(100, "")
+	}
+
+	if !n.IsDir() {
+		return etcdErr.NewError(104, "")
+	}
+
+	_, name := filepath.Split(child.Path)
+
+	_, ok := n.Children[name]
+
+	if ok {
+		return etcdErr.NewError(105, "")
+	}
+
+	n.Children[name] = child
+
+	return nil
+
+}
+
+// Clone function clone the node recursively and return the new node.
+// If the node is a directory, it will clone all the content under this directory.
+// If the node is a key-value pair, it will clone the pair.
+func (n *Node) Clone() *Node {
+	n.mu.Lock()
+	n.mu.Unlock()
+	if !n.IsDir() {
+		return newFile(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
+	}
+
+	clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL)
+
+	for key, child := range n.Children {
+		clone.Children[key] = child.Clone()
+	}
+
+	return clone
+}
+
+// IsDir function checks whether the node is a directory.
+// If the node is a directory, the function will return true.
+// Otherwise the function will return false.
+func (n *Node) IsDir() bool {
+
+	if n.Children == nil { // key-value pair
+		return false
+	}
+
+	return true
+}
+
+func (n *Node) Expire() {
+	for {
+		duration := n.ExpireTime.Sub(time.Now())
+		if duration <= 0 {
+			n.Remove(true)
+			return
+		}
+
+		select {
+		// if timeout, delete the node
+		case <-time.After(duration):
+			n.Remove(true)
+			return
+
+		// if removed, return
+		case <-n.removeChan:
+			fmt.Println("node removed")
+			return
+
+		}
+	}
+}

+ 75 - 0
file_system/watcher.go

@@ -0,0 +1,75 @@
+package fileSystem
+
+import (
+	"container/list"
+	"path/filepath"
+	"strings"
+)
+
+type watcherHub struct {
+	watchers     map[string]*list.List
+	count        uint64 // current number of watchers
+	EventHistory *EventHistory
+}
+
+func newWatchHub(capacity int) *watcherHub {
+	return &watcherHub{
+		watchers:     make(map[string]*list.List),
+		EventHistory: newEventHistory(capacity),
+	}
+}
+
+func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) {
+	eventChan := make(chan *Event, 1)
+
+	e, err := wh.EventHistory.scan(prefix, index)
+
+	if err != nil {
+		return err, nil
+	}
+
+	if e != nil {
+		eventChan <- e
+		return nil, eventChan
+	}
+
+	l, ok := wh.watchers[prefix]
+
+	if ok {
+		l.PushBack(eventChan)
+	} else {
+		l := list.New()
+		l.PushBack(eventChan)
+		wh.watchers[prefix] = l
+	}
+
+	return nil, eventChan
+}
+
+func (wh *watcherHub) notify(e *Event) {
+
+	segments := strings.Split(e.Key, "/")
+	currPath := "/"
+
+	// walk through all the paths
+	for _, segment := range segments {
+		currPath = filepath.Join(currPath, segment)
+
+		l, ok := wh.watchers[currPath]
+
+		if ok {
+
+			for {
+				element := l.Front()
+				if element == nil {
+					delete(wh.watchers, currPath)
+					break
+				}
+				c, _ := element.Value.(chan *Event)
+				c <- e
+				l.Remove(element)
+			}
+		}
+
+	}
+}

+ 31 - 0
file_system/watcher_test.go

@@ -0,0 +1,31 @@
+package fileSystem
+
+import (
+	"testing"
+)
+
+func TestWatch(t *testing.T) {
+	wh := newWatchHub(100)
+	err, c := wh.watch("/foo", 0)
+
+	if err != nil {
+		t.Fatal("%v", err)
+	}
+
+	select {
+	case <-c:
+		t.Fatal("should not receive from channel before send the event")
+	default:
+		// do nothing
+	}
+
+	e := newEvent(Set, "/foo/bar", 1, 0)
+
+	wh.notify(e)
+
+	re := <-c
+
+	if e != re {
+		t.Fatal("recv != send")
+	}
+}

+ 16 - 15
raft_server.go

@@ -16,13 +16,13 @@ import (
 
 type raftServer struct {
 	*raft.Server
-	version   string
-	joinIndex uint64
-	name      string
-	url       string
+	version    string
+	joinIndex  uint64
+	name       string
+	url        string
 	listenHost string
-	tlsConf   *TLSConfig
-	tlsInfo   *TLSInfo
+	tlsConf    *TLSConfig
+	tlsInfo    *TLSInfo
 }
 
 var r *raftServer
@@ -30,7 +30,7 @@ var r *raftServer
 func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 
 	// Create transporter for raft
-	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
+	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout)
 
 	// Create raft server
 	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
@@ -38,13 +38,13 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 	check(err)
 
 	return &raftServer{
-		Server:  server,
-		version: raftVersion,
-		name:    name,
-		url:     url,
+		Server:     server,
+		version:    raftVersion,
+		name:       name,
+		url:        url,
 		listenHost: listenHost,
-		tlsConf: tlsConf,
-		tlsInfo: tlsInfo,
+		tlsConf:    tlsConf,
+		tlsInfo:    tlsInfo,
 	}
 }
 
@@ -169,7 +169,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
 // getVersion fetches the raft version of a peer. This works for now but we
 // will need to do something more sophisticated later when we allow mixed
 // version clusters.
-func getVersion(t transporter, versionURL url.URL) (string, error) {
+func getVersion(t *transporter, versionURL url.URL) (string, error) {
 	resp, err := t.Get(versionURL.String())
 
 	if err != nil {
@@ -198,6 +198,7 @@ func joinCluster(cluster []string) bool {
 			if _, ok := err.(etcdErr.Error); ok {
 				fatal(err)
 			}
+
 			debugf("cannot join to cluster via machine %s %s", machine, err)
 		}
 	}
@@ -209,7 +210,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
 	var b bytes.Buffer
 
 	// t must be ok
-	t, _ := r.Transporter().(transporter)
+	t, _ := r.Transporter().(*transporter)
 
 	// Our version must match the leaders version
 	versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}

+ 55 - 11
transporter.go

@@ -9,17 +9,25 @@ import (
 	"io"
 	"net"
 	"net/http"
+	"time"
 )
 
 // Transporter layer for communication between raft nodes
 type transporter struct {
-	client *http.Client
+	client  *http.Client
+	timeout time.Duration
+}
+
+// response struct
+type transporterResponse struct {
+	resp *http.Response
+	err  error
 }
 
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
-func newTransporter(scheme string, tlsConf tls.Config) transporter {
+func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
 	t := transporter{}
 
 	tr := &http.Transport{
@@ -32,8 +40,9 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter {
 	}
 
 	t.client = &http.Client{Transport: tr}
+	t.timeout = timeout
 
-	return t
+	return &t
 }
 
 // Dial with timeout
@@ -42,7 +51,7 @@ func dialTimeout(network, addr string) (net.Conn, error) {
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
-func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
+func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -69,7 +78,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 }
 
 // Sends RequestVote RPCs to a peer when the server is the candidate.
-func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
+func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
 	var rvrsp *raft.RequestVoteResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -95,7 +104,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 }
 
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
-func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
+func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
 	var aersp *raft.SnapshotResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -123,7 +132,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 }
 
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
-func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
+func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
 	var aersp *raft.SnapshotRecoveryResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -150,11 +159,46 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 }
 
 // Send server side POST request
-func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
-	return t.client.Post(path, "application/json", body)
+func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
+
+	c := make(chan *transporterResponse, 1)
+
+	go func() {
+		tr := new(transporterResponse)
+		tr.resp, tr.err = t.client.Post(path, "application/json", body)
+		c <- tr
+	}()
+
+	return t.waitResponse(c)
+
 }
 
 // Send server side GET request
-func (t transporter) Get(path string) (*http.Response, error) {
-	return t.client.Get(path)
+func (t *transporter) Get(path string) (*http.Response, error) {
+
+	c := make(chan *transporterResponse, 1)
+
+	go func() {
+		tr := new(transporterResponse)
+		tr.resp, tr.err = t.client.Get(path)
+		c <- tr
+	}()
+
+	return t.waitResponse(c)
+}
+
+func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
+
+	timeoutChan := time.After(t.timeout)
+
+	select {
+	case <-timeoutChan:
+		return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
+
+	case r := <-responseChan:
+		return r.resp, r.err
+	}
+
+	// for complier
+	return nil, nil
 }

+ 36 - 0
transporter_test.go

@@ -0,0 +1,36 @@
+package main
+
+import (
+	"crypto/tls"
+	"testing"
+	"time"
+)
+
+func TestTransporterTimeout(t *testing.T) {
+
+	conf := tls.Config{}
+
+	ts := newTransporter("http", conf, time.Second)
+
+	ts.Get("http://google.com")
+	_, err := ts.Get("http://google.com:9999") // it doesn't exisit
+	if err == nil || err.Error() != "Wait Response Timeout: 1s" {
+		t.Fatal("timeout error: ", err.Error())
+	}
+
+	_, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit
+	if err == nil || err.Error() != "Wait Response Timeout: 1s" {
+		t.Fatal("timeout error: ", err.Error())
+	}
+
+	_, err = ts.Get("http://www.google.com")
+	if err != nil {
+		t.Fatal("get error")
+	}
+
+	_, err = ts.Post("http://www.google.com", nil)
+	if err != nil {
+		t.Fatal("post error")
+	}
+
+}

+ 6 - 1
util.go

@@ -114,11 +114,16 @@ func sanitizeListenHost(listen string, advertised string) string {
 		fatal(err)
 	}
 
-	_, aport, err := net.SplitHostPort(aurl.Host)
+	ahost, aport, err := net.SplitHostPort(aurl.Host)
 	if err != nil {
 		fatal(err)
 	}
 
+	// If the listen host isn't set use the advertised host
+	if listen == "" {
+		listen = ahost
+	}
+
 	return net.JoinHostPort(listen, aport)
 }