Browse Source

fix a expiration bug

Xiang Li 12 years ago
parent
commit
74650431e9
5 changed files with 72 additions and 117 deletions
  1. 2 16
      command.go
  2. 20 2
      raftd.go
  3. 50 10
      store/store.go
  4. 0 87
      web/home.html
  5. 0 2
      web/web.go

+ 2 - 16
command.go

@@ -10,7 +10,6 @@ import (
 	"encoding/json"
 	"time"
 	"github.com/xiangli-cmu/raft-etcd/store"
-	"github.com/xiangli-cmu/raft-etcd/web"
 	)
 
 // A command represents an action to be taken on the replicated state machine.
@@ -38,12 +37,7 @@ func (c *SetCommand) CommandName() string {
 
 // Set the value of key to value
 func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
-	res := store.Set(c.Key, c.Value, c.ExpireTime)
-	msg, err := json.Marshal(res)
-	if err == nil && web.HubOpen(){
-		web.Hub().Send(string(msg))
-	}
-	return msg, err
+	return store.Set(c.Key, c.Value, c.ExpireTime)
 }
 
 // Get the path for http request
@@ -118,15 +112,7 @@ func (c *DeleteCommand) CommandName() string {
 
 // Delete the key 
 func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
-	res := store.Delete(c.Key)
-
-	msg, err := json.Marshal(res)
-
-	if err == nil && web.HubOpen(){
-		web.Hub().Send(string(msg))
-	}
-	
-	return msg, err
+	return store.Delete(c.Key)
 }
 
 func (c *DeleteCommand) GeneratePath() string{

+ 20 - 2
raftd.go

@@ -28,6 +28,9 @@ var verbose bool
 var leaderHost string
 var address string
 var webPort int
+var cert string
+var key string
+var CAFile string
 
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
@@ -63,6 +66,8 @@ type Info struct {
 var server *raft.Server
 var logger *log.Logger
 
+var storeMsg chan string
+
 //------------------------------------------------------------------------------
 //
 // Functions
@@ -106,7 +111,9 @@ func main() {
 
 	// Setup new raft server.
 	s := store.GetStore()
+
 	server, err = raft.NewServer(name, path, t, s, nil)
+
 	if err != nil {
 		fatal("%v", err)
 	}
@@ -168,9 +175,10 @@ func main() {
 
     if webPort != -1 {
     	// start web
-    	
+    	s.SetMessager(&storeMsg)
+    	go webHelper()
     	go web.Start(server, webPort)
-    }
+    } 
 
     // listen on http port
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
@@ -255,6 +263,16 @@ func Join(s *raft.Server, serverName string) error {
 	}
 	return fmt.Errorf("raftd: Unable to join: %v", err)
 }
+//--------------------------------------
+// Web Helper
+//--------------------------------------
+
+func webHelper() {
+	storeMsg = make(chan string)
+	for {
+		web.Hub().Send(<-storeMsg)
+	}
+}
 
 
 //--------------------------------------

+ 50 - 10
store/store.go

@@ -17,6 +17,7 @@ const (
 
 type Store struct {
 	Nodes map[string]Node  `json:"nodes"`
+	messager *chan string
 }
 
 type Node struct {
@@ -40,6 +41,7 @@ var s *Store
 
 func init() {
 	s = createStore()
+	s.messager = nil
 }
 
 // make a new stroe
@@ -53,8 +55,12 @@ func GetStore() *Store {
 	return s
 }
 
+func (s *Store)SetMessager(messager *chan string) {
+	s.messager = messager
+}	
+
 // set the key to value, return the old value if the key exists 
-func Set(key string, value string, expireTime time.Time) Response {
+func Set(key string, value string, expireTime time.Time) ([]byte, error) {
 
 	key = path.Clean(key)
 
@@ -75,12 +81,12 @@ func Set(key string, value string, expireTime time.Time) Response {
 		//update := make(chan time.Time)
 		//s.Nodes[key] = Node{value, expireTime, update}
 
-		node.ExpireTime = expireTime
-		node.Value = value
-		notify(SET, key, node.Value, value, true)
+		
+		
 		// if node is not permanent before 
 		// update its expireTime
 		if !node.ExpireTime.Equal(time.Unix(0,0)) {
+
 				node.update <- expireTime
 
 		} else {
@@ -94,21 +100,44 @@ func Set(key string, value string, expireTime time.Time) Response {
 			}
 		}
 
-		return Response{SET, key, node.Value, value, true, expireTime}
+		node.ExpireTime = expireTime
+
+		node.Value = value
+		notify(SET, key, node.Value, value, true)
+		
+		msg, err := json.Marshal(Response{SET, key, node.Value, value, true, expireTime})
+
+		// notify the web interface
+		if (s.messager != nil && err == nil) {
+
+			*s.messager <- string(msg)
+		} 
+
+		return msg, err
 
 	} else {
 
+		// add new node
 		update := make(chan time.Time)
 
 		s.Nodes[key] = Node{value, expireTime, update}
 
+		// nofity the watcher
 		notify(SET, key, "", value, false)
 
 		if isExpire {
 			go expire(key, update, expireTime)
 		}
-		
-		return Response{SET, key, "", value, false, time.Unix(0, 0)}
+
+		msg, err := json.Marshal(Response{SET, key, "", value, false, expireTime})
+
+		// notify the web interface
+		if (s.messager != nil && err == nil) {
+
+			*s.messager <- string(msg)
+		} 
+
+		return msg, err
 	}
 }
 
@@ -148,7 +177,7 @@ func Get(key string) Response {
 }
 
 // delete the key, return the old value if the key exists
-func Delete(key string) Response {
+func Delete(key string) ([]byte, error) {
 	key = path.Clean(key)
 
 	node, ok := s.Nodes[key]
@@ -158,9 +187,20 @@ func Delete(key string) Response {
 
 		notify(DELETE, key, node.Value, "", true)
 
-		return Response{DELETE, key, node.Value, "", true, node.ExpireTime}
+		msg, err := json.Marshal(Response{DELETE, key, node.Value, "", true, node.ExpireTime})
+
+		// notify the web interface
+		if (s.messager != nil && err == nil) {
+
+			*s.messager <- string(msg)
+		} 
+
+		return msg, err
+
 	} else {
-		return Response{DELETE, key, "", "", false, time.Unix(0, 0)}
+		// no notify to the watcher and web interface
+
+		return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)})
 	}
 }
 

+ 0 - 87
web/home.html

@@ -1,87 +0,0 @@
-<html>
-<head>
-<title>Alpaca Web Interface</title>
-<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>
-<script type="text/javascript">
-    $(function() {
-
-    var conn;
-    var msg = $("#msg");
-    var log = $("#log");
-
-    function appendLog(msg) {
-        var d = log[0]
-        var doScroll = d.scrollTop == d.scrollHeight - d.clientHeight;
-        msg.appendTo(log)
-        if (doScroll) {
-            d.scrollTop = d.scrollHeight - d.clientHeight;
-        }
-    }
-
-    $("#form").submit(function() {
-        if (!conn) {
-            return false;
-        }
-        if (!msg.val()) {
-            return false;
-        }
-        conn.send(msg.val());
-        msg.val("");
-        return false
-    });
-
-    if (window["WebSocket"]) {
-        conn = new WebSocket("ws://{{$}}/ws");
-        conn.onclose = function(evt) {
-            appendLog($("<div><b>Connection closed.</b></div>"))
-        }
-        conn.onmessage = function(evt) {
-            appendLog($("<div/>").text(evt.data))
-        }
-    } else {
-        appendLog($("<div><b>Your browser does not support WebSockets.</b></div>"))
-    }
-    });
-</script>
-<style type="text/css">
-html {
-    overflow: hidden;
-}
-
-body {
-    overflow: hidden;
-    padding: 0;
-    margin: 0;
-    width: 100%;
-    height: 100%;
-    background: gray;
-}
-
-#log {
-    background: white;
-    margin: 0;
-    padding: 0.5em 0.5em 0.5em 0.5em;
-    position: absolute;
-    top: 0.5em;
-    left: 0.5em;
-    right: 0.5em;
-    bottom: 3em;
-    overflow: auto;
-}
-
-#form {
-    padding: 0 0.5em 0 0.5em;
-    margin: 0;
-    position: absolute;
-    bottom: 1em;
-    left: 0px;
-    width: 100%;
-    overflow: hidden;
-}
-
-</style>
-</head>
-<body>
-<div id="log"></div>
-</body>
-</html>

+ 0 - 2
web/web.go

@@ -38,8 +38,6 @@ func handler(w http.ResponseWriter, r *http.Request) {
         }
     }
 
-    time.Sleep(10 * time.Second)
-
 }
 
 var mainTempl = template.Must(template.ParseFiles("home.html"))