Browse Source

use xiangli-cmu/go-raft as the raft lib

Xiang Li 12 years ago
parent
commit
e91ea7be38
5 changed files with 37 additions and 17 deletions
  1. 7 7
      command.go
  2. 24 2
      handlers.go
  3. 4 6
      raftd.go
  4. 1 1
      trans_handler.go
  5. 1 1
      web/web.go

+ 7 - 7
command.go

@@ -8,7 +8,7 @@ package main
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	"github.com/benbjohnson/go-raft"
+	"github.com/xiangli-cmu/go-raft"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"time"
 	"time"
 )
 )
@@ -16,7 +16,7 @@ import (
 // A command represents an action to be taken on the replicated state machine.
 // A command represents an action to be taken on the replicated state machine.
 type Command interface {
 type Command interface {
 	CommandName() string
 	CommandName() string
-	Apply(server *raft.Server) ([]byte, error)
+	Apply(server *raft.Server) (interface {}, error)
 }
 }
 
 
 // Set command
 // Set command
@@ -32,7 +32,7 @@ func (c *SetCommand) CommandName() string {
 }
 }
 
 
 // Set the value of key to value
 // Set the value of key to value
-func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *SetCommand) Apply(server *raft.Server) (interface {}, error) {
 	return store.Set(c.Key, c.Value, c.ExpireTime)
 	return store.Set(c.Key, c.Value, c.ExpireTime)
 }
 }
 
 
@@ -52,7 +52,7 @@ func (c *GetCommand) CommandName() string {
 }
 }
 
 
 // Set the value of key to value
 // Set the value of key to value
-func (c *GetCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *GetCommand) Apply(server *raft.Server) (interface {}, error) {
 	res := store.Get(c.Key)
 	res := store.Get(c.Key)
 	return json.Marshal(res)
 	return json.Marshal(res)
 }
 }
@@ -72,7 +72,7 @@ func (c *DeleteCommand) CommandName() string {
 }
 }
 
 
 // Delete the key
 // Delete the key
-func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *DeleteCommand) Apply(server *raft.Server) (interface {}, error) {
 	return store.Delete(c.Key)
 	return store.Delete(c.Key)
 }
 }
 
 
@@ -86,7 +86,7 @@ func (c *WatchCommand) CommandName() string {
 	return "watch"
 	return "watch"
 }
 }
 
 
-func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *WatchCommand) Apply(server *raft.Server) (interface {}, error) {
 	ch := make(chan store.Response)
 	ch := make(chan store.Response)
 
 
 	// add to the watchers list
 	// add to the watchers list
@@ -107,7 +107,7 @@ func (c *JoinCommand) CommandName() string {
 	return "join"
 	return "join"
 }
 }
 
 
-func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *JoinCommand) Apply(server *raft.Server) (interface {}, error) {
 	err := server.AddPeer(c.Name)
 	err := server.AddPeer(c.Name)
 	// no result will be returned
 	// no result will be returned
 	return nil, err
 	return nil, err

+ 24 - 2
handlers.go

@@ -2,7 +2,7 @@ package main
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	"github.com/benbjohnson/go-raft"
+	"github.com/xiangli-cmu/go-raft"
 	"net/http"
 	"net/http"
 	//"fmt"
 	//"fmt"
 	"io/ioutil"
 	"io/ioutil"
@@ -134,11 +134,21 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
 func excute(c Command, w *http.ResponseWriter) {
 func excute(c Command, w *http.ResponseWriter) {
 	if server.State() == "leader" {
 	if server.State() == "leader" {
 		if body, err := server.Do(c); err != nil {
 		if body, err := server.Do(c); err != nil {
-			warn("raftd: Unable to write file: %v", err)
+			warn("Commit failed %v", err)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			return
 			return
 		} else {
 		} else {
 			(*w).WriteHeader(http.StatusOK)
 			(*w).WriteHeader(http.StatusOK)
+
+			if body == nil {
+				return 
+			}
+
+			body, ok := body.([]byte)
+			if !ok {
+				panic ("wrong type")
+			}
+
 			(*w).Write(body)
 			(*w).Write(body)
 			return
 			return
 		}
 		}
@@ -174,6 +184,12 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 		return
 	} else {
 	} else {
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
+
+		body, ok := body.([]byte)
+		if !ok {
+			panic ("wrong type")
+		}
+
 		w.Write(body)
 		w.Write(body)
 		return
 		return
 	}
 	}
@@ -194,6 +210,12 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 		return
 	} else {
 	} else {
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
+
+		body, ok := body.([]byte)
+		if !ok {
+			panic ("wrong type")
+		}
+		
 		w.Write(body)
 		w.Write(body)
 		return
 		return
 	}
 	}

+ 4 - 6
raftd.go

@@ -8,7 +8,7 @@ import (
 	"encoding/pem"
 	"encoding/pem"
 	"flag"
 	"flag"
 	"fmt"
 	"fmt"
-	"github.com/benbjohnson/go-raft"
+	"github.com/xiangli-cmu/go-raft"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"github.com/xiangli-cmu/raft-etcd/web"
 	"github.com/xiangli-cmu/raft-etcd/web"
 	"io"
 	"io"
@@ -53,8 +53,8 @@ const (
 )
 )
 
 
 const (
 const (
-	ELECTIONTIMTOUT  = 3 * time.Second
-	HEARTBEATTIMEOUT = 1 * time.Second
+	ELECTIONTIMTOUT  = 200 * time.Millisecond
+	HEARTBEATTIMEOUT = 50 * time.Millisecond
 )
 )
 
 
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
@@ -149,7 +149,6 @@ func main() {
 
 
 		// start as a leader in a new cluster
 		// start as a leader in a new cluster
 		if leaderHost == "" {
 		if leaderHost == "" {
-			server.StartHeartbeatTimeout()
 			server.StartLeader()
 			server.StartLeader()
 
 
 			// join self as a peer
 			// join self as a peer
@@ -160,7 +159,6 @@ func main() {
 
 
 			// start as a fellower in a existing cluster
 			// start as a fellower in a existing cluster
 		} else {
 		} else {
-			server.StartElectionTimeout()
 			server.StartFollower()
 			server.StartFollower()
 
 
 			err := Join(server, leaderHost)
 			err := Join(server, leaderHost)
@@ -172,7 +170,6 @@ func main() {
 
 
 		// rejoin the previous cluster
 		// rejoin the previous cluster
 	} else {
 	} else {
-		server.StartElectionTimeout()
 		server.StartFollower()
 		server.StartFollower()
 		debug("%s start as a follower", server.Name())
 		debug("%s start as a follower", server.Name())
 	}
 	}
@@ -248,6 +245,7 @@ func startTransport(port int, st int) {
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
+		debug("%s listen on http", server.Name())
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:

+ 1 - 1
trans_handler.go

@@ -4,7 +4,7 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/benbjohnson/go-raft"
+	"github.com/xiangli-cmu/go-raft"
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 )
 )

+ 1 - 1
web/web.go

@@ -3,7 +3,7 @@ package web
 import (
 import (
 	"code.google.com/p/go.net/websocket"
 	"code.google.com/p/go.net/websocket"
 	"fmt"
 	"fmt"
-	"github.com/benbjohnson/go-raft"
+	"github.com/xiangli-cmu/go-raft"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"github.com/xiangli-cmu/raft-etcd/store"
 	"html/template"
 	"html/template"
 	"net/http"
 	"net/http"