Browse Source

add web interface

Xiang Li 12 years ago
parent
commit
68884a7137
7 changed files with 37 additions and 509 deletions
  1. 20 7
      command.go
  2. 17 0
      raftd.go
  3. 0 185
      store.go
  4. 0 126
      store_test.go
  5. 0 85
      tree_store.bak
  6. 0 77
      watcher.go
  7. 0 29
      watcher_test.go

+ 20 - 7
command.go

@@ -9,6 +9,8 @@ import (
 	"github.com/benbjohnson/go-raft"
 	"encoding/json"
 	"time"
+	"github.com/xiangli-cmu/raft-etcd/store"
+	"github.com/xiangli-cmu/raft-etcd/web"
 	)
 
 // A command represents an action to be taken on the replicated state machine.
@@ -36,8 +38,12 @@ func (c *SetCommand) CommandName() string {
 
 // Set the value of key to value
 func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
-	res := s.Set(c.Key, c.Value, c.ExpireTime)
-	return json.Marshal(res)
+	res := store.Set(c.Key, c.Value, c.ExpireTime)
+	msg, err := json.Marshal(res)
+	if err == nil && web.HubOpen(){
+		web.Hub().Send(string(msg))
+	}
+	return msg, err
 }
 
 // Get the path for http request
@@ -75,7 +81,7 @@ func (c *GetCommand) CommandName() string {
 
 // Set the value of key to value
 func (c *GetCommand) Apply(server *raft.Server) ([]byte, error){
-	res := s.Get(c.Key)
+	res := store.Get(c.Key)
 	return json.Marshal(res)
 }
 
@@ -112,8 +118,15 @@ func (c *DeleteCommand) CommandName() string {
 
 // Delete the key 
 func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
-	res := s.Delete(c.Key)
-	return json.Marshal(res)
+	res := store.Delete(c.Key)
+
+	msg, err := json.Marshal(res)
+
+	if err == nil && web.HubOpen(){
+		web.Hub().Send(string(msg))
+	}
+	
+	return msg, err
 }
 
 func (c *DeleteCommand) GeneratePath() string{
@@ -148,10 +161,10 @@ func (c *WatchCommand) CommandName() string {
 }
 
 func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
-	ch := make(chan Response)
+	ch := make(chan store.Response)
 
 	// add to the watchers list
-	w.add(c.Key, ch)	
+	store.AddWatcher(c.Key, ch)	
 
 	// wait for the notification for any changing
 	res := <- ch

+ 17 - 0
raftd.go

@@ -14,6 +14,8 @@ import (
 	"os"
 	"time"
 	"strconv"
+	"github.com/xiangli-cmu/raft-etcd/web"
+	"github.com/xiangli-cmu/raft-etcd/store"
 )
 
 //------------------------------------------------------------------------------
@@ -25,11 +27,13 @@ import (
 var verbose bool
 var leaderHost string
 var address string
+var webPort int
 
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.StringVar(&leaderHost, "c", "", "join to a existing cluster")
 	flag.StringVar(&address, "a", "", "the address of the local machine")
+	flag.IntVar(&webPort, "w", -1, "the port of web interface")
 }
 
 const (
@@ -101,15 +105,19 @@ func main() {
 	t := transHandler{}
 
 	// Setup new raft server.
+	s := store.GetStore()
 	server, err = raft.NewServer(name, path, t, s, nil)
 	if err != nil {
 		fatal("%v", err)
 	}
 
 	server.LoadSnapshot()
+	debug("%s finished load snapshot", server.Name())
 	server.Initialize()
+	debug("%s finished init", server.Name())
 	server.SetElectionTimeout(ELECTIONTIMTOUT)
 	server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
+	debug("%s finished set timeout", server.Name())
 
 	if server.IsLogEmpty() {
 
@@ -122,6 +130,7 @@ func main() {
 			command := &JoinCommand{}
 			command.Name = server.Name()
 			server.Do(command)
+			debug("%s start as a leader", server.Name())
 
 		// start as a fellower in a existing cluster
 		} else {
@@ -136,6 +145,7 @@ func main() {
 	} else {
 		server.StartElectionTimeout()
 		server.StartFollower()
+		debug("%s start as a follower", server.Name())
 	}
 
 	// open the snapshot
@@ -155,6 +165,13 @@ func main() {
     http.HandleFunc("/delete/", DeleteHttpHandler)
     http.HandleFunc("/watch/", WatchHttpHandler)
 
+
+    if webPort != -1 {
+    	// start web
+    	
+    	go web.Start(server, webPort)
+    }
+
     // listen on http port
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
 }

+ 0 - 185
store.go

@@ -1,185 +0,0 @@
-package main
-
-import (
-	"path"
-	"encoding/json"
-	"time"
-	"fmt"
-	)
-
-// CONSTANTS
-const (
-	ERROR = -1 + iota
-	SET 
-	DELETE
-	GET
-)
-
-type Store struct {
-	Nodes map[string]Node  `json:"nodes"`
-}
-
-type Node struct {
-	Value string	`json:"value"`
-	ExpireTime time.Time `json:"expireTime"`
-	update chan time.Time `json:"-"`
-}
-
-type Response struct {
-	Action	 int    `json:"action"`
-	Key      string `json:"key"`
-	OldValue string `json:"oldValue"`
-	NewValue string `json:"newValue"`
-	Exist 	 bool `json:"exist"`
-	Expiration time.Time `json:"expiration"`
-}
-
-
-// global store
-var s *Store
-
-func init() {
-	s = createStore()
-}
-
-// make a new stroe
-func createStore() *Store{
-	s := new(Store)
-	s.Nodes = make(map[string]Node)
-	return s
-}
-
-// set the key to value, return the old value if the key exists 
-func (s *Store) Set(key string, value string, expireTime time.Time) Response {
-
-	key = path.Clean(key)
-
-	var expire bool = false
-
-	expire = !expireTime.Equal(time.Unix(0,0))
-
-	// when the slow follower receive the set command
-	// the key may be expired, we need also to delete 
-	// the previous value of key
-	if expire && expireTime.Sub(time.Now()) < 0 {
-		return s.Delete(key)
-	}
-
-	node, ok := s.Nodes[key]
-
-	if ok {
-		update := make(chan time.Time)
-		s.Nodes[key] = Node{value, expireTime, update}
-		w.notify(SET, key, node.Value, value, true)
-
-		// node is not permanent before
-		if !node.ExpireTime.Equal(time.Unix(0,0)) {
-				node.update <- expireTime
-		} else {
-			// if current node is not permanent
-			if expire {
-				go s.expire(key, update, expireTime)
-			}
-		}
-
-		return Response{SET, key, node.Value, value, true, expireTime}
-
-	} else {
-		update := make(chan time.Time)
-		s.Nodes[key] = Node{value, expireTime, update}
-		w.notify(SET, key, "", value, false)
-		if expire {
-			go s.expire(key, update, expireTime)
-		}
-		return Response{SET, key, "", value, false, time.Unix(0, 0)}
-	}
-}
-
-// delete the key when it expires
-func (s *Store) expire(key string, update chan time.Time, expireTime time.Time) {
-	duration := expireTime.Sub(time.Now())
-
-	for {
-		select {
-		// timeout delte key
-		case <-time.After(duration):
-			fmt.Println("expired at ", time.Now())
-			s.Delete(key)
-			return
-		case updateTime := <-update:
-			//update duration
-			if updateTime.Equal(time.Unix(0,0)) {
-				fmt.Println("node became stable")
-				return
-			}
-			duration = updateTime.Sub(time.Now())
-		}
-	}
-}
-
-// get the value of the key
-func (s *Store) Get(key string) Response {
-	key = path.Clean(key)
-
-	node, ok := s.Nodes[key]
-
-	if ok {
-		return Response{GET, key, node.Value, node.Value, true, node.ExpireTime}
-	} else {
-		return Response{GET, key, "", "", false, time.Unix(0, 0)}
-	}
-}
-
-// delete the key, return the old value if the key exists
-func (s *Store) Delete(key string) Response {
-	key = path.Clean(key)
-
-	node, ok := s.Nodes[key]
-
-	if ok {
-		delete(s.Nodes, key)
-
-		w.notify(DELETE, key, node.Value, "", true)
-
-		return Response{DELETE, key, node.Value, "", true, node.ExpireTime}
-	} else {
-		return Response{DELETE, key, "", "", false, time.Unix(0, 0)}
-	}
-}
-
-// save the current state of the storage system
-func (s *Store) Save() ([]byte, error) {
-	b, err := json.Marshal(s)
-	if err != nil {
-		fmt.Println(err)
-		return nil, err
-	}
-	return b, nil
-}
-
-// recovery the state of the stroage system from a previous state
-func (s *Store) Recovery(state []byte) error {
-	err := json.Unmarshal(state, s)
-	s.clean()
-	return err
-}
-
-// clean all expired keys
-func (s *Store) clean() {
-	for key, node := range s.Nodes{
-		// stable node
-		if node.ExpireTime.Equal(time.Unix(0,0)) {
-			continue
-		} else {
-			if node.ExpireTime.Sub(time.Now()) >= time.Second {
-				node.update = make(chan time.Time)
-				go s.expire(key, node.update, node.ExpireTime)
-			} else {
-				// we should delete this node
-				delete(s.Nodes, key)
-			}
-		}
-
-	}
-}
-

+ 0 - 126
store_test.go

@@ -1,126 +0,0 @@
-package main
-
-import (
-	"testing"
-	"time"
-	"fmt"
-)
-
-func TestStoreGet(t *testing.T) {
-
-	s.Set("foo", "bar", time.Unix(0, 0))
-
-	res := s.Get("foo")
-
-	if res.NewValue != "bar" {
-		t.Fatalf("Cannot get stored value")
-	}
-
-	s.Delete("foo")
-	res = s.Get("foo")
-
-	if res.Exist {
-		t.Fatalf("Got deleted value")
-	}
-}
-
-func TestSaveAndRecovery(t *testing.T) {
-
-	s.Set("foo", "bar", time.Unix(0, 0))
-	s.Set("foo2", "bar2", time.Now().Add(time.Second * 5))
-	state, err := s.Save()
-
-	if err != nil {
-		t.Fatalf("Cannot Save")
-	}
-
-	newStore := createStore()
-
-	// wait for foo2 expires
-	time.Sleep(time.Second * 6)
-
-	newStore.Recovery(state)
-
-	res := newStore.Get("foo")
-
-	if res.OldValue != "bar" {
-		t.Fatalf("Cannot recovery")
-	}
-
-	res = newStore.Get("foo2")
-
-	if res.Exist {
-		t.Fatalf("Get expired value")
-	}
-
-
-	s.Delete("foo")
-
-}
-
-func TestExpire(t *testing.T) {
-	fmt.Println(time.Now())
-	fmt.Println("TEST EXPIRE")
-
-	// test expire
-	s.Set("foo", "bar", time.Now().Add(time.Second * 1))
-	time.Sleep(2*time.Second)
-
-	res := s.Get("foo")
-
-	if res.Exist {
-		t.Fatalf("Got expired value")
-	}
-
-	//test change expire time
-	s.Set("foo", "bar", time.Now().Add(time.Second * 10))
-
-	res = s.Get("foo")
-
-	if !res.Exist {
-		t.Fatalf("Cannot get Value")
-	}
-
-	s.Set("foo", "barbar", time.Now().Add(time.Second * 1))
-
-	time.Sleep(2 * time.Second)
-
-	res = s.Get("foo")
-
-	if res.Exist {
-		t.Fatalf("Got expired value")
-	}
-
-
-	// test change expire to stable
-	s.Set("foo", "bar", time.Now().Add(time.Second * 1))
-
-	s.Set("foo", "bar", time.Unix(0,0))
-
-	time.Sleep(2*time.Second)
-
-	res = s.Get("foo")
-
-	if !res.Exist {
-		t.Fatalf("Cannot get Value")
-	}
-
-	// test stable to expire 
-	s.Set("foo", "bar", time.Now().Add(time.Second * 1))
-	time.Sleep(2*time.Second)
-	res = s.Get("foo")
-
-	if res.Exist {
-		t.Fatalf("Got expired value")
-	}
-
-	// test set older node 
-	s.Set("foo", "bar", time.Now().Add(-time.Second * 1))
-	res = s.Get("foo")
-
-	if res.Exist {
-		t.Fatalf("Got expired value")
-	}
-
-
-}

+ 0 - 85
tree_store.bak

@@ -1,85 +0,0 @@
-package main
-
-import (
-	"path"
-	"strings"
-	)
-
-type store struct {
-	nodes map[string]node
-}
-
-type node struct {
-	value string
-	dir bool // just for clearity
-	nodes map[string]node
-}
-
-// set the key to value, return the old value if the key exists 
-func (s *store) set(key string, value string) string, error {
-
-	key = path.Clean(key)
-
-	nodeNames := strings.Split(key, "/")
-
-	levelNodes := s.nodes
-	for i = 0; i < len(nodes) - 1; ++i {
-		node, ok := levelNodes[nodeNames[i]]
-		// add new dir
-		if !ok {
-			node := Node{nodeNames[i], true, make(map[string]node)}
-			levelNodes[nodeNames[i]] := node
-		} else if ok && !node.dir {
-			return nil, errors.New("The key is a directory")
-		}
-		else {
-			levelNodes = levelNodes.nodes
-		}
-	}
-	// add the last node and value
-	node, ok := levelNodes[nodeNames[i]]
-
-	if !ok {
-		node := Node{nodeNames[i], false, nil}
-		levelNodes[nodeNames] = node
-		return nil, nil
-	} else {
-		oldValue := node.value
-		node.value = value
-		return oldValue ,nil
-	}
-
-}
-
-// get the node of the key
-func (s *store) get(key string) node {
-	key = path.Clean(key)
-
-	nodeNames := strings.Split(key, "/")
-
-	levelNodes := s.nodes
-	
-	for i = 0; i < len(nodes) - 1; ++i {
-		node, ok := levelNodes[nodeNames[i]]
-		if !ok || !node.dir {
-			return nil
-		}
-		levelNodes = levelNodes.nodes
-	}
-
-	node, ok := levelNodes[nodeNames[i]]
-	if ok {
-		return node
-	}
-	return nil
-
-}
-
-// delete the key, return the old value if the key exists
-func (s *store) delete(key string) string {
-	return nil
-}
-
-func (n *node) Value() string{
-	return n.value
-}

+ 0 - 77
watcher.go

@@ -1,77 +0,0 @@
-package main
-
-import (
-	"path"
-	"strings"
-	//"fmt"
-	"time"
-	)
-
-
-type Watcher struct {
-	chanMap map[string][]chan Response
-}
-
-// global watcher
-var w *Watcher
-
-// init the global watcher
-func init() {
-	w = createWatcher()
-}
-
-// create a new watcher
-func createWatcher() *Watcher {
-	w := new(Watcher)
-	w.chanMap = make(map[string][]chan Response)
-	return w
-}
-
-// register a function with channel and prefix to the watcher
-func (w *Watcher) add(prefix string, c chan Response) error {
-
-	prefix = "/" + path.Clean(prefix)
-	debug("Add a watche at ", prefix)
-
-	_, ok := w.chanMap[prefix]
-	if !ok {
-		w.chanMap[prefix] = make([]chan Response, 0)
-		w.chanMap[prefix] = append(w.chanMap[prefix], c)
-	} else {
-		w.chanMap[prefix] = append(w.chanMap[prefix], c)
-	}
-
-	return nil
-}
-
-// notify the watcher a action happened
-func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error {
-	key = path.Clean(key)
-	segments := strings.Split(key, "/")
-	currPath := "/"
-
-	// walk through all the pathes
-	for _, segment := range segments {
-		currPath = path.Join(currPath, segment)
-
-		chans, ok := w.chanMap[currPath]
-
-		if ok {
-			debug("Notify at %s", currPath)
-
-			n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)}
-
-			// notify all the watchers
-			for _, c := range chans {
-				c <- n
-			}
-			
-			// we have notified all the watchers at this path
-			// delete the map
-			delete(w.chanMap, currPath)
-		}
-
-	}
-
-	return nil
-}

+ 0 - 29
watcher_test.go

@@ -1,29 +0,0 @@
-package main
-
-import (
-	"testing"
-	"fmt"
-	"time"
-)
-
-func TestWatch(t *testing.T) {
-	// watcher := createWatcher()
-	c := make(chan Response)
-	d := make(chan Response)
-	w.add("/", c)
-	go say(c)
-	w.add("/prefix/", d)
-	go say(d)
-	s.Set("/prefix/foo", "bar", time.Unix(0, 0))
-}
-
-func say(c chan Response) {
-	result := <-c
-
-	if result.Action != -1 {
-		fmt.Println("yes")
-	} else {
-		fmt.Println("no")
-	}
-
-}