Browse Source

refactor; add save and recover

Xiang Li 12 years ago
parent
commit
2c9c278e4d

+ 12 - 12
etcd_handlers.go

@@ -176,18 +176,18 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er
 //--------------------------------------
 
 // Handler to return the current leader's raft address
-// func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
-// 	leader := r.Leader()
-
-// 	if leader != "" {
-// 		w.WriteHeader(http.StatusOK)
-// 		raftURL, _ := nameToRaftURL(leader)
-// 		w.Write([]byte(raftURL))
-// 		return nil
-// 	} else {
-// 		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "")
-// 	}
-// }
+func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
+	leader := r.Leader()
+
+	if leader != "" {
+		w.WriteHeader(http.StatusOK)
+		raftURL, _ := nameToRaftURL(leader)
+		w.Write([]byte(raftURL))
+		return nil
+	} else {
+		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "")
+	}
+}
 
 // Handler to return all the known machines in the current cluster
 func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {

+ 19 - 18
file_system/event.go

@@ -2,10 +2,11 @@ package fileSystem
 
 import (
 	"fmt"
-	etcdErr "github.com/coreos/etcd/error"
 	"strings"
 	"sync"
 	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
 )
 
 const (
@@ -61,26 +62,26 @@ func newEvent(action string, key string, index uint64, term uint64) *Event {
 }
 
 type eventQueue struct {
-	events   []*Event
-	size     int
-	front    int
-	capacity int
+	Events   []*Event
+	Size     int
+	Front    int
+	Capacity int
 }
 
 func (eq *eventQueue) back() int {
-	return (eq.front + eq.size - 1 + eq.capacity) % eq.capacity
+	return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
 }
 
 func (eq *eventQueue) insert(e *Event) {
 
-	index := (eq.back() + 1) % eq.capacity
+	index := (eq.back() + 1) % eq.Capacity
 
-	eq.events[index] = e
+	eq.Events[index] = e
 
-	if eq.size == eq.capacity { //dequeue
-		eq.front = (index + 1) % eq.capacity
+	if eq.Size == eq.Capacity { //dequeue
+		eq.Front = (index + 1) % eq.Capacity
 	} else {
-		eq.size++
+		eq.Size++
 	}
 
 }
@@ -94,8 +95,8 @@ type EventHistory struct {
 func newEventHistory(capacity int) *EventHistory {
 	return &EventHistory{
 		Queue: eventQueue{
-			capacity: capacity,
-			events:   make([]*Event, capacity),
+			Capacity: capacity,
+			Events:   make([]*Event, capacity),
 		},
 	}
 }
@@ -107,7 +108,7 @@ func (eh *EventHistory) addEvent(e *Event) {
 
 	eh.Queue.insert(e)
 
-	eh.StartIndex = eh.Queue.events[eh.Queue.front].Index
+	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
 }
 
 // scan function is enumerating events from the index in history and
@@ -129,19 +130,19 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 			)
 	}
 
-	if start >= uint64(eh.Queue.size) {
+	if start >= uint64(eh.Queue.Size) {
 		return nil, nil
 	}
 
-	i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity))
+	i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
 
 	for {
-		e := eh.Queue.events[i]
+		e := eh.Queue.Events[i]
 		if strings.HasPrefix(e.Key, prefix) {
 			return e, nil
 		}
 
-		i = (i + 1) % eh.Queue.capacity
+		i = (i + 1) % eh.Queue.Capacity
 
 		if i == eh.Queue.back() {
 			// TODO: Add error type

+ 4 - 4
file_system/event_test.go

@@ -19,15 +19,15 @@ func TestEventQueue(t *testing.T) {
 
 	// Test
 	j := 100
-	i := eh.Queue.front
-	n := eh.Queue.size
+	i := eh.Queue.Front
+	n := eh.Queue.Size
 	for ; n > 0; n-- {
-		e := eh.Queue.events[i]
+		e := eh.Queue.Events[i]
 		if e.Index != uint64(j) {
 			t.Fatalf("queue error!")
 		}
 		j++
-		i = (i + 1) % eh.Queue.capacity
+		i = (i + 1) % eh.Queue.Capacity
 
 	}
 

+ 41 - 10
file_system/file_system.go

@@ -1,6 +1,7 @@
 package fileSystem
 
 import (
+	"encoding/json"
 	"fmt"
 	"path"
 	"sort"
@@ -11,11 +12,10 @@ import (
 )
 
 type FileSystem struct {
-	Root         *Node
-	EventHistory *EventHistory
-	WatcherHub   *watcherHub
-	Index        uint64
-	Term         uint64
+	Root       *Node
+	WatcherHub *watcherHub
+	Index      uint64
+	Term       uint64
 }
 
 func New() *FileSystem {
@@ -126,7 +126,7 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 
 	// Node with TTL
 	if expireTime != Permanent {
-		go n.Expire()
+		n.Expire()
 		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
 	}
@@ -164,13 +164,13 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 	}
 
 	// update ttl
-	if n.ExpireTime != Permanent && expireTime != Permanent {
+	if !n.IsPermanent() && expireTime != Permanent {
 		n.stopExpire <- true
 	}
 
-	if expireTime != Permanent {
+	if expireTime.Sub(Permanent) != 0 {
 		n.ExpireTime = expireTime
-		go n.Expire()
+		n.Expire()
 		e.Expiration = &n.ExpireTime
 		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
 	}
@@ -298,7 +298,6 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*
 // If it does not exist, this function will create a new directory and return the pointer to that node.
 // If it is a file, this function will return error.
 func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
-
 	subDir, ok := parent.Children[dirName]
 
 	if ok {
@@ -311,3 +310,35 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
 
 	return n, nil
 }
+
+// Save function saves the static state of the store system.
+// Save function will not be able to save the state of watchers.
+// Save function will not save the parent field of the node. Or there will
+// be cyclic dependencies issue for the json package.
+func (fs *FileSystem) Save() []byte {
+	cloneFs := New()
+	cloneFs.Root = fs.Root.Clone()
+
+	b, err := json.Marshal(fs)
+
+	if err != nil {
+		panic(err)
+	}
+
+	return b
+}
+
+// recovery function recovery the store system from a static state.
+// It needs to recovery the parent field of the nodes.
+// It needs to delete the expired nodes since the saved time and also
+// need to create monitor go routines.
+func (fs *FileSystem) Recover(state []byte) {
+	err := json.Unmarshal(state, fs)
+
+	if err != nil {
+		panic(err)
+	}
+
+	fs.Root.recoverAndclean()
+
+}

+ 96 - 44
file_system/file_system_test.go

@@ -311,7 +311,6 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 		t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar")
 	}
 
-	//e, err = fs.TestAndSet("/foo", )
 }
 
 func TestWatch(t *testing.T) {
@@ -377,46 +376,17 @@ func TestWatch(t *testing.T) {
 
 }
 
-func createAndGet(fs *FileSystem, path string, t *testing.T) {
-	_, err := fs.Create(path, "bar", Permanent, 1, 1)
-
-	if err != nil {
-		t.Fatalf("cannot create %s=bar [%s]", path, err.Error())
-	}
-
-	e, err := fs.Get(path, false, false, 1, 1)
-
-	if err != nil {
-		t.Fatalf("cannot get %s [%s]", path, err.Error())
-	}
-
-	if e.Value != "bar" {
-		t.Fatalf("expect value of %s is bar [%s]", path, e.Value)
-	}
-
-}
-
-func nonblockingRetrive(c <-chan *Event) *Event {
-	select {
-	case e := <-c:
-		return e
-	default:
-		return nil
-	}
-}
-
 func TestSort(t *testing.T) {
 	fs := New()
 
 	// simulating random creation
 	keys := GenKeys(80, 4)
 
-	//t.Log(keys)
 	i := uint64(1)
 	for _, k := range keys {
 		_, err := fs.Create(k, "bar", Permanent, i, 1)
 		if err != nil {
-			//t.Logf("create node[%s] failed %s", k, err.Error())
+			panic(err)
 		} else {
 			i++
 		}
@@ -428,8 +398,7 @@ func TestSort(t *testing.T) {
 	}
 
 	for i, k := range e.KVPairs[:len(e.KVPairs)-1] {
-		//t.Log("root:")
-		//t.Log(k)
+
 		if k.Key >= e.KVPairs[i+1].Key {
 			t.Fatalf("sort failed, [%s] should be placed after [%s]", k.Key, e.KVPairs[i+1].Key)
 		}
@@ -445,23 +414,60 @@ func TestSort(t *testing.T) {
 	}
 }
 
-func recursiveTestSort(k KeyValuePair, t *testing.T) {
-	//t.Log("recursive in")
-	//t.Log(k)
-	for i, v := range k.KVPairs[:len(k.KVPairs)-1] {
-		if v.Key >= k.KVPairs[i+1].Key {
-			t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key)
+func TestSaveAndRecover(t *testing.T) {
+	fs := New()
+
+	// simulating random creation
+	keys := GenKeys(8, 4)
+
+	i := uint64(1)
+	for _, k := range keys {
+		_, err := fs.Create(k, "bar", Permanent, i, 1)
+		if err != nil {
+			panic(err)
+		} else {
+			i++
 		}
+	}
 
-		if v.Dir {
-			recursiveTestSort(v, t)
+	// create a node with expiration
+	// test if we can reach the node before expiration
+
+	expire := time.Now().Add(time.Second)
+	fs.Create("/foo/foo", "bar", expire, 1, 1)
+
+	b := fs.Save()
+
+	cloneFs := New()
+
+	time.Sleep(time.Second)
+
+	cloneFs.Recover(b)
+
+	for i, k := range keys {
+		_, err := cloneFs.Get(k, false, false, uint64(i), 1)
+		if err != nil {
+			panic(err)
 		}
+	}
 
+	if fs.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
+		t.Fatal("Error recovered event history start index")
 	}
 
-	if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir {
-		recursiveTestSort(v, t)
+	for i = 0; int(i) < fs.WatcherHub.EventHistory.Queue.Size; i++ {
+		if fs.WatcherHub.EventHistory.Queue.Events[i].Key !=
+			cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {
+			t.Fatal("Error recovered event history")
+		}
+	}
+
+	_, err := fs.Get("/foo/foo", false, false, 1, 1)
+
+	if err == nil || err.Error() != "Key Not Found" {
+		t.Fatalf("can get the node after deletion ")
 	}
+
 }
 
 // GenKeys randomly generate num of keys with max depth
@@ -473,8 +479,54 @@ func GenKeys(num int, depth int) []string {
 		depth := rand.Intn(depth) + 1
 
 		for j := 0; j < depth; j++ {
-			keys[i] += "/" + strconv.Itoa(rand.Int()%20)
+			keys[i] += "/" + strconv.Itoa(rand.Int())
 		}
 	}
 	return keys
 }
+
+func createAndGet(fs *FileSystem, path string, t *testing.T) {
+	_, err := fs.Create(path, "bar", Permanent, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot create %s=bar [%s]", path, err.Error())
+	}
+
+	e, err := fs.Get(path, false, false, 1, 1)
+
+	if err != nil {
+		t.Fatalf("cannot get %s [%s]", path, err.Error())
+	}
+
+	if e.Value != "bar" {
+		t.Fatalf("expect value of %s is bar [%s]", path, e.Value)
+	}
+
+}
+
+func recursiveTestSort(k KeyValuePair, t *testing.T) {
+
+	for i, v := range k.KVPairs[:len(k.KVPairs)-1] {
+		if v.Key >= k.KVPairs[i+1].Key {
+			t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key)
+		}
+
+		if v.Dir {
+			recursiveTestSort(v, t)
+		}
+
+	}
+
+	if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir {
+		recursiveTestSort(v, t)
+	}
+}
+
+func nonblockingRetrive(c <-chan *Event) *Event {
+	select {
+	case e := <-c:
+		return e
+	default:
+		return nil
+	}
+}

+ 52 - 26
file_system/node.go

@@ -1,7 +1,6 @@
 package fileSystem
 
 import (
-	"fmt"
 	"path"
 	"sort"
 	"sync"
@@ -25,7 +24,7 @@ type Node struct {
 	CreateTerm    uint64
 	ModifiedIndex uint64
 	ModifiedTerm  uint64
-	Parent        *Node
+	Parent        *Node `json:"-"`
 	ExpireTime    time.Time
 	ACL           string
 	Value         string           // for key-value pair
@@ -237,62 +236,89 @@ func (n *Node) Clone() *Node {
 	return clone
 }
 
-// IsDir function checks whether the node is a directory.
-// If the node is a directory, the function will return true.
-// Otherwise the function will return false.
-func (n *Node) IsDir() bool {
-
-	if n.Children == nil { // key-value pair
-		return false
+func (n *Node) recoverAndclean() {
+	if n.IsDir() {
+		for _, child := range n.Children {
+			child.Parent = n
+			child.recoverAndclean()
+		}
 	}
 
-	return true
+	n.stopExpire = make(chan bool, 1)
+
+	n.Expire()
 }
 
 func (n *Node) Expire() {
-	duration := n.ExpireTime.Sub(time.Now())
-	if duration <= 0 {
+	expired, duration := n.IsExpired()
+
+	if expired { // has been expired
 		n.Remove(true, nil)
 		return
 	}
 
-	select {
-	// if timeout, delete the node
-	case <-time.After(duration):
-		n.Remove(true, nil)
+	if duration == 0 { // Permanent Node
 		return
+	}
 
-	// if stopped, return
-	case <-n.stopExpire:
-		fmt.Println("expire stopped")
-		return
+	go func() { // do monitoring
+		select {
+		// if timeout, delete the node
+		case <-time.After(duration):
+			n.Remove(true, nil)
+			return
 
-	}
+		// if stopped, return
+		case <-n.stopExpire:
+			return
+
+		}
+	}()
 }
 
 // IsHidden function checks if the node is a hidden node. A hidden node
 // will begin with '_'
-
 // A hidden node will not be shown via get command under a directory
 // For example if we have /foo/_hidden and /foo/notHidden, get "/foo"
 // will only return /foo/notHidden
 func (n *Node) IsHidden() bool {
 	_, name := path.Split(n.Path)
 
-	if name[0] == '_' { //hidden
-		return true
+	return name[0] == '_'
+
+}
+
+func (n *Node) IsPermanent() bool {
+	return n.ExpireTime.Sub(Permanent) == 0
+}
+
+func (n *Node) IsExpired() (bool, time.Duration) {
+	if n.IsPermanent() {
+		return false, 0
+	}
+
+	duration := n.ExpireTime.Sub(time.Now())
+	if duration <= 0 {
+		return true, 0
 	}
 
-	return false
+	return false, duration
 }
 
-func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
+// IsDir function checks whether the node is a directory.
+// If the node is a directory, the function will return true.
+// Otherwise the function will return false.
+func (n *Node) IsDir() bool {
+	return !(n.Children == nil)
+}
 
+func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 	if n.IsDir() {
 		pair := KeyValuePair{
 			Key: n.Path,
 			Dir: true,
 		}
+
 		if !recurisive {
 			return pair
 		}

+ 2 - 1
file_system/watcher.go

@@ -99,7 +99,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 }
 
 func (wh *watcherHub) notify(e *Event) {
-
 	segments := strings.Split(e.Key, "/")
 
 	currPath := "/"
@@ -109,4 +108,6 @@ func (wh *watcherHub) notify(e *Event) {
 		currPath = path.Join(currPath, segment)
 		wh.notifyWithPath(e, currPath, false)
 	}
+
+	wh.EventHistory.addEvent(e)
 }