Browse Source

add new files

Xiang Li 12 years ago
parent
commit
cc2608e9f8
9 changed files with 768 additions and 0 deletions
  1. 201 0
      store/store.go
  2. 126 0
      store/store_test.go
  3. 85 0
      store/tree_store.bak
  4. 80 0
      store/watcher.go
  5. 29 0
      store/watcher_test.go
  6. 30 0
      web/conn.go
  7. 87 0
      web/home.html
  8. 61 0
      web/hub.go
  9. 69 0
      web/web.go

+ 201 - 0
store/store.go

@@ -0,0 +1,201 @@
+package store
+
+import (
+	"path"
+	"encoding/json"
+	"time"
+	"fmt"
+	)
+
+// CONSTANTS
+const (
+	ERROR = -1 + iota
+	SET 
+	DELETE
+	GET
+)
+
+type Store struct {
+	Nodes map[string]Node  `json:"nodes"`
+}
+
+type Node struct {
+	Value string	`json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+	update chan time.Time `json:"-"`
+}
+
+type Response struct {
+	Action	 int    `json:"action"`
+	Key      string `json:"key"`
+	OldValue string `json:"oldValue"`
+	NewValue string `json:"newValue"`
+	Exist 	 bool `json:"exist"`
+	Expiration time.Time `json:"expiration"`
+}
+
+
+// global store
+var s *Store
+
+func init() {
+	s = createStore()
+}
+
+// make a new stroe
+func createStore() *Store{
+	s := new(Store)
+	s.Nodes = make(map[string]Node)
+	return s
+}
+
+func GetStore() *Store {
+	return s
+}
+
+// set the key to value, return the old value if the key exists 
+func Set(key string, value string, expireTime time.Time) Response {
+
+	key = path.Clean(key)
+
+	var isExpire bool = false
+
+	isExpire = !expireTime.Equal(time.Unix(0,0))
+
+	// when the slow follower receive the set command
+	// the key may be expired, we need also to delete 
+	// the previous value of key
+	if isExpire && expireTime.Sub(time.Now()) < 0 {
+		return Delete(key)
+	}
+
+	node, ok := s.Nodes[key]
+
+	if ok {
+		//update := make(chan time.Time)
+		//s.Nodes[key] = Node{value, expireTime, update}
+
+		node.ExpireTime = expireTime
+		node.Value = value
+		notify(SET, key, node.Value, value, true)
+		// if node is not permanent before 
+		// update its expireTime
+		if !node.ExpireTime.Equal(time.Unix(0,0)) {
+				node.update <- expireTime
+
+		} else {
+
+			// if we want the permanent to have expire time
+			// we need to create a chan and create a func
+			if isExpire {
+				node.update = make(chan time.Time)
+
+				go expire(key, node.update, expireTime)
+			}
+		}
+
+		return Response{SET, key, node.Value, value, true, expireTime}
+
+	} else {
+
+		update := make(chan time.Time)
+
+		s.Nodes[key] = Node{value, expireTime, update}
+
+		notify(SET, key, "", value, false)
+
+		if isExpire {
+			go expire(key, update, expireTime)
+		}
+		
+		return Response{SET, key, "", value, false, time.Unix(0, 0)}
+	}
+}
+
+// delete the key when it expires
+func expire(key string, update chan time.Time, expireTime time.Time) {
+	duration := expireTime.Sub(time.Now())
+
+	for {
+		select {
+		// timeout delte key
+		case <-time.After(duration):
+			fmt.Println("expired at ", time.Now())
+			Delete(key)
+			return
+		case updateTime := <-update:
+			//update duration
+			if updateTime.Equal(time.Unix(0,0)) {
+				fmt.Println("node became stable")
+				return
+			}
+			duration = updateTime.Sub(time.Now())
+		}
+	}
+}
+
+// get the value of the key
+func Get(key string) Response {
+	key = path.Clean(key)
+
+	node, ok := s.Nodes[key]
+
+	if ok {
+		return Response{GET, key, node.Value, node.Value, true, node.ExpireTime}
+	} else {
+		return Response{GET, key, "", "", false, time.Unix(0, 0)}
+	}
+}
+
+// delete the key, return the old value if the key exists
+func Delete(key string) Response {
+	key = path.Clean(key)
+
+	node, ok := s.Nodes[key]
+
+	if ok {
+		delete(s.Nodes, key)
+
+		notify(DELETE, key, node.Value, "", true)
+
+		return Response{DELETE, key, node.Value, "", true, node.ExpireTime}
+	} else {
+		return Response{DELETE, key, "", "", false, time.Unix(0, 0)}
+	}
+}
+
+// save the current state of the storage system
+func (s *Store)Save() ([]byte, error) {
+	b, err := json.Marshal(s)
+	if err != nil {
+		fmt.Println(err)
+		return nil, err
+	}
+	return b, nil
+}
+
+// recovery the state of the stroage system from a previous state
+func (s *Store)Recovery(state []byte) error {
+	err := json.Unmarshal(state, s)
+	clean()
+	return err
+}
+
+// clean all expired keys
+func clean() {
+	for key, node := range s.Nodes{
+		// stable node
+		if node.ExpireTime.Equal(time.Unix(0,0)) {
+			continue
+		} else {
+			if node.ExpireTime.Sub(time.Now()) >= time.Second {
+				node.update = make(chan time.Time)
+				go expire(key, node.update, node.ExpireTime)
+			} else {
+				// we should delete this node
+				delete(s.Nodes, key)
+			}
+		}
+
+	}
+}

+ 126 - 0
store/store_test.go

@@ -0,0 +1,126 @@
+package store
+
+import (
+	"testing"
+	"time"
+	"fmt"
+)
+
+func TestStoreGet(t *testing.T) {
+
+	Set("foo", "bar", time.Unix(0, 0))
+
+	res := Get("foo")
+
+	if res.NewValue != "bar" {
+		t.Fatalf("Cannot get stored value")
+	}
+
+	Delete("foo")
+	res = Get("foo")
+
+	if res.Exist {
+		t.Fatalf("Got deleted value")
+	}
+}
+
+// func TestSaveAndRecovery(t *testing.T) {
+
+// 	Set("foo", "bar", time.Unix(0, 0))
+// 	Set("foo2", "bar2", time.Now().Add(time.Second * 5))
+// 	state, err := s.Save()
+
+// 	if err != nil {
+// 		t.Fatalf("Cannot Save")
+// 	}
+
+// 	newStore := createStore()
+
+// 	// wait for foo2 expires
+// 	time.Sleep(time.Second * 6)
+
+// 	newStore.Recovery(state)
+
+// 	res := newStore.Get("foo")
+
+// 	if res.OldValue != "bar" {
+// 		t.Fatalf("Cannot recovery")
+// 	}
+
+// 	res = newStore.Get("foo2")
+
+// 	if res.Exist {
+// 		t.Fatalf("Get expired value")
+// 	}
+
+
+// 	s.Delete("foo")
+
+// }
+
+func TestExpire(t *testing.T) {
+	fmt.Println(time.Now())
+	fmt.Println("TEST EXPIRE")
+
+	// test expire
+	Set("foo", "bar", time.Now().Add(time.Second * 1))
+	time.Sleep(2*time.Second)
+
+	res := Get("foo")
+
+	if res.Exist {
+		t.Fatalf("Got expired value")
+	}
+
+	//test change expire time
+	Set("foo", "bar", time.Now().Add(time.Second * 10))
+
+	res = Get("foo")
+
+	if !res.Exist {
+		t.Fatalf("Cannot get Value")
+	}
+
+	Set("foo", "barbar", time.Now().Add(time.Second * 1))
+
+	time.Sleep(2 * time.Second)
+
+	res = Get("foo")
+
+	if res.Exist {
+		t.Fatalf("Got expired value")
+	}
+
+
+	// test change expire to stable
+	Set("foo", "bar", time.Now().Add(time.Second * 1))
+
+	Set("foo", "bar", time.Unix(0,0))
+
+	time.Sleep(2*time.Second)
+
+	res = s.Get("foo")
+
+	if !res.Exist {
+		t.Fatalf("Cannot get Value")
+	}
+
+	// test stable to expire 
+	s.Set("foo", "bar", time.Now().Add(time.Second * 1))
+	time.Sleep(2*time.Second)
+	res = s.Get("foo")
+
+	if res.Exist {
+		t.Fatalf("Got expired value")
+	}
+
+	// test set older node 
+	s.Set("foo", "bar", time.Now().Add(-time.Second * 1))
+	res = s.Get("foo")
+
+	if res.Exist {
+		t.Fatalf("Got expired value")
+	}
+
+
+}

+ 85 - 0
store/tree_store.bak

@@ -0,0 +1,85 @@
+package main
+
+import (
+	"path"
+	"strings"
+	)
+
+type store struct {
+	nodes map[string]node
+}
+
+type node struct {
+	value string
+	dir bool // just for clearity
+	nodes map[string]node
+}
+
+// set the key to value, return the old value if the key exists 
+func (s *store) set(key string, value string) string, error {
+
+	key = path.Clean(key)
+
+	nodeNames := strings.Split(key, "/")
+
+	levelNodes := s.nodes
+	for i = 0; i < len(nodes) - 1; ++i {
+		node, ok := levelNodes[nodeNames[i]]
+		// add new dir
+		if !ok {
+			node := Node{nodeNames[i], true, make(map[string]node)}
+			levelNodes[nodeNames[i]] := node
+		} else if ok && !node.dir {
+			return nil, errors.New("The key is a directory")
+		}
+		else {
+			levelNodes = levelNodes.nodes
+		}
+	}
+	// add the last node and value
+	node, ok := levelNodes[nodeNames[i]]
+
+	if !ok {
+		node := Node{nodeNames[i], false, nil}
+		levelNodes[nodeNames] = node
+		return nil, nil
+	} else {
+		oldValue := node.value
+		node.value = value
+		return oldValue ,nil
+	}
+
+}
+
+// get the node of the key
+func (s *store) get(key string) node {
+	key = path.Clean(key)
+
+	nodeNames := strings.Split(key, "/")
+
+	levelNodes := s.nodes
+	
+	for i = 0; i < len(nodes) - 1; ++i {
+		node, ok := levelNodes[nodeNames[i]]
+		if !ok || !node.dir {
+			return nil
+		}
+		levelNodes = levelNodes.nodes
+	}
+
+	node, ok := levelNodes[nodeNames[i]]
+	if ok {
+		return node
+	}
+	return nil
+
+}
+
+// delete the key, return the old value if the key exists
+func (s *store) delete(key string) string {
+	return nil
+}
+
+func (n *node) Value() string{
+	return n.value
+}

+ 80 - 0
store/watcher.go

@@ -0,0 +1,80 @@
+package store
+
+import (
+	"path"
+	"strings"
+	//"fmt"
+	"time"
+	)
+
+
+type Watchers struct {
+	chanMap map[string][]chan Response
+}
+
+// global watcher
+var w *Watchers
+
+
+// init the global watcher
+func init() {
+	w = createWatcher()
+}
+
+// create a new watcher
+func createWatcher() *Watchers {
+	w := new(Watchers)
+	w.chanMap = make(map[string][]chan Response)
+	return w
+}
+
+func Watcher() *Watchers {
+	return w
+}
+
+// register a function with channel and prefix to the watcher
+func AddWatcher(prefix string, c chan Response) error {
+
+	prefix = "/" + path.Clean(prefix)
+
+	_, ok := w.chanMap[prefix]
+	if !ok {
+		w.chanMap[prefix] = make([]chan Response, 0)
+		w.chanMap[prefix] = append(w.chanMap[prefix], c)
+	} else {
+		w.chanMap[prefix] = append(w.chanMap[prefix], c)
+	}
+
+	return nil
+}
+
+// notify the watcher a action happened
+func notify(action int, key string, oldValue string, newValue string, exist bool) error {
+	key = path.Clean(key)
+	segments := strings.Split(key, "/")
+	currPath := "/"
+
+	// walk through all the pathes
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+
+		chans, ok := w.chanMap[currPath]
+
+		if ok {
+
+			n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)}
+
+			// notify all the watchers
+			for _, c := range chans {
+				c <- n
+			}
+			
+			// we have notified all the watchers at this path
+			// delete the map
+			delete(w.chanMap, currPath)
+		}
+
+	}
+
+	return nil
+}

+ 29 - 0
store/watcher_test.go

@@ -0,0 +1,29 @@
+package store
+
+import (
+	"testing"
+	"fmt"
+	"time"
+)
+
+func TestWatch(t *testing.T) {
+	// watcher := createWatcher()
+	c := make(chan Response)
+	d := make(chan Response)
+	w.add("/", c)
+	go say(c)
+	w.add("/prefix/", d)
+	go say(d)
+	s.Set("/prefix/foo", "bar", time.Unix(0, 0))
+}
+
+func say(c chan Response) {
+	result := <-c
+
+	if result.Action != -1 {
+		fmt.Println("yes")
+	} else {
+		fmt.Println("no")
+	}
+
+}

+ 30 - 0
web/conn.go

@@ -0,0 +1,30 @@
+package web
+ 
+import (
+	"code.google.com/p/go.net/websocket"
+)
+ 
+type connection struct {
+	// The websocket connection.
+	ws *websocket.Conn
+ 
+	// Buffered channel of outbound messages.
+	send chan string
+}
+ 
+func (c *connection) writer() {
+	for message := range c.send {
+		err := websocket.Message.Send(c.ws, message)
+		if err != nil {
+			break
+		}
+	}
+	c.ws.Close()
+}
+ 
+func wsHandler(ws *websocket.Conn) {
+	c := &connection{send: make(chan string, 256), ws: ws}
+	h.register <- c
+	defer func() { h.unregister <- c }()
+	c.writer()
+}

+ 87 - 0
web/home.html

@@ -0,0 +1,87 @@
+<html>
+<head>
+<title>Alpaca Web Interface</title>
+<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>
+<script type="text/javascript">
+    $(function() {
+
+    var conn;
+    var msg = $("#msg");
+    var log = $("#log");
+
+    function appendLog(msg) {
+        var d = log[0]
+        var doScroll = d.scrollTop == d.scrollHeight - d.clientHeight;
+        msg.appendTo(log)
+        if (doScroll) {
+            d.scrollTop = d.scrollHeight - d.clientHeight;
+        }
+    }
+
+    $("#form").submit(function() {
+        if (!conn) {
+            return false;
+        }
+        if (!msg.val()) {
+            return false;
+        }
+        conn.send(msg.val());
+        msg.val("");
+        return false
+    });
+
+    if (window["WebSocket"]) {
+        conn = new WebSocket("ws://{{$}}/ws");
+        conn.onclose = function(evt) {
+            appendLog($("<div><b>Connection closed.</b></div>"))
+        }
+        conn.onmessage = function(evt) {
+            appendLog($("<div/>").text(evt.data))
+        }
+    } else {
+        appendLog($("<div><b>Your browser does not support WebSockets.</b></div>"))
+    }
+    });
+</script>
+<style type="text/css">
+html {
+    overflow: hidden;
+}
+
+body {
+    overflow: hidden;
+    padding: 0;
+    margin: 0;
+    width: 100%;
+    height: 100%;
+    background: gray;
+}
+
+#log {
+    background: white;
+    margin: 0;
+    padding: 0.5em 0.5em 0.5em 0.5em;
+    position: absolute;
+    top: 0.5em;
+    left: 0.5em;
+    right: 0.5em;
+    bottom: 3em;
+    overflow: auto;
+}
+
+#form {
+    padding: 0 0.5em 0 0.5em;
+    margin: 0;
+    position: absolute;
+    bottom: 1em;
+    left: 0px;
+    width: 100%;
+    overflow: hidden;
+}
+
+</style>
+</head>
+<body>
+<div id="log"></div>
+</body>
+</html>

+ 61 - 0
web/hub.go

@@ -0,0 +1,61 @@
+package web
+
+type hub struct {
+    // status
+    open    bool
+
+    // Registered connections.
+    connections map[*connection]bool
+
+    // Inbound messages from the connections.
+    broadcast chan string
+
+    // Register requests from the connections.
+    register chan *connection
+
+    // Unregister requests from connections.
+    unregister chan *connection
+}
+
+var h = hub{
+    open:   false,
+    broadcast:   make(chan string),
+    register:    make(chan *connection),
+    unregister:  make(chan *connection),
+    connections: make(map[*connection]bool),
+}
+
+func Hub() *hub{
+    return &h
+}
+
+func HubOpen() bool {
+    return h.open
+}
+
+func (h *hub) run() {
+    h.open = true
+    for {
+        select {
+        case c := <-h.register:
+            h.connections[c] = true
+        case c := <-h.unregister:
+            delete(h.connections, c)
+            close(c.send)
+        case m := <-h.broadcast:
+            for c := range h.connections {
+                select {
+                case c.send <- m:
+                default:
+                    delete(h.connections, c)
+                    close(c.send)
+                    go c.ws.Close()
+                }
+            }
+        }
+    }
+}
+
+func (h *hub) Send(msg string) {
+    h.broadcast <- msg
+}

+ 69 - 0
web/web.go

@@ -0,0 +1,69 @@
+package web
+
+import (
+    "fmt"
+    "net/http"
+    "github.com/xiangli-cmu/raft-etcd/store"
+    "github.com/benbjohnson/go-raft"
+    "time"
+    "code.google.com/p/go.net/websocket"
+    "html/template"
+)
+
+var s *raft.Server
+
+type MainPage struct {
+    Leader string
+    Address string
+}
+
+func handler(w http.ResponseWriter, r *http.Request) {
+    fmt.Fprintf(w, "Leader:\n%s\n", s.Leader())
+    fmt.Fprintf(w, "Peers:\n")
+
+    for peerName, _ := range s.Peers() {
+        fmt.Fprintf(w, "%s\n", peerName)
+    }
+
+
+    fmt.Fprintf(w, "Data\n")
+
+    s := store.GetStore()
+
+    for key, node := range s.Nodes {
+        if node.ExpireTime.Equal(time.Unix(0,0)) {
+            fmt.Fprintf(w, "%s %s\n", key, node.Value)
+        } else {
+            fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime)
+        }
+    }
+
+    time.Sleep(10 * time.Second)
+
+}
+
+var mainTempl = template.Must(template.ParseFiles("home.html"))
+
+func mainHandler(c http.ResponseWriter, req *http.Request) {
+
+    p := &MainPage{Leader: s.Leader(),
+        Address: s.Name(),}
+
+    mainTempl.Execute(c, p)
+}
+
+
+func Start(server *raft.Server, port int) {
+	s = server
+
+    go h.run()
+    http.HandleFunc("/", mainHandler)
+    http.Handle("/ws", websocket.Handler(wsHandler))
+
+    //http.HandleFunc("/", handler)
+    fmt.Println("web listening at port ", port)
+    http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
+}
+
+
+