Browse Source

Merge pull request #45 from ccding/master

revise log handling
Xiang Li 12 years ago
parent
commit
c4d2834ebb
6 changed files with 70 additions and 56 deletions
  1. 7 7
      client_handlers.go
  2. 25 26
      etcd.go
  3. 12 12
      raft_handlers.go
  4. 1 1
      store/keywords.go
  5. 6 6
      transporter.go
  6. 19 4
      util.go

+ 7 - 7
client_handlers.go

@@ -43,7 +43,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
 
 	value := req.FormValue("value")
 
@@ -90,7 +90,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
 
 	command := &DeleteCommand{}
 	command.Key = key
@@ -170,7 +170,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 			url = scheme + raftServer.Leader() + path
 		}
 
-		debug("Redirect to %s", url)
+		debugf("Redirect to %s", url)
 
 		http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
 		return
@@ -229,7 +229,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
 
 	command := &GetCommand{}
 	command.Key = key
@@ -266,13 +266,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	command.Key = key
 
 	if req.Method == "GET" {
-		debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
+		debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
 		command.SinceIndex = 0
 
 	} else if req.Method == "POST" {
 		// watch from a specific index
 
-		debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
+		debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
 		content := req.FormValue("index")
 
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@@ -288,7 +288,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 
 	if body, err := command.Apply(raftServer); err != nil {
-		warn("Unable to do watch command: %v", err)
+		warnf("Unable to do watch command: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 	} else {
 		w.WriteHeader(http.StatusOK)

+ 25 - 26
etcd.go

@@ -12,7 +12,6 @@ import (
 	"github.com/coreos/etcd/web"
 	"github.com/coreos/go-raft"
 	"io/ioutil"
-	"log"
 	"net"
 	"net/http"
 	"os"
@@ -169,7 +168,7 @@ func main() {
 	if cpuprofile != "" {
 		f, err := os.Create(cpuprofile)
 		if err != nil {
-			log.Fatal(err)
+			fatal(err)
 		}
 		pprof.StartCPUProfile(f)
 		defer pprof.StopCPUProfile()
@@ -196,7 +195,7 @@ func main() {
 	} else if machinesFile != "" {
 		b, err := ioutil.ReadFile(machinesFile)
 		if err != nil {
-			fatal("Unable to read the given machines file: %s", err)
+			fatalf("Unable to read the given machines file: %s", err)
 		}
 		cluster = strings.Split(string(b), ",")
 	}
@@ -206,7 +205,7 @@ func main() {
 
 	// Read server info from file or grab it from user.
 	if err := os.MkdirAll(dirPath, 0744); err != nil {
-		fatal("Unable to create path: %s", err)
+		fatalf("Unable to create path: %s", err)
 	}
 
 	info = getInfo(dirPath)
@@ -249,7 +248,7 @@ func startRaft(securityType int) {
 	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
 
 	if err != nil {
-		fatal(fmt.Sprintln(err))
+		fatal(err)
 	}
 
 	// LoadSnapshot
@@ -257,9 +256,9 @@ func startRaft(securityType int) {
 		err = raftServer.LoadSnapshot()
 
 		if err == nil {
-			debug("%s finished load snapshot", raftServer.Name())
+			debugf("%s finished load snapshot", raftServer.Name())
 		} else {
-			debug(err.Error())
+			debug(err)
 		}
 	}
 
@@ -290,7 +289,7 @@ func startRaft(securityType int) {
 					break
 				}
 			}
-			debug("%s start as a leader", raftServer.Name())
+			debugf("%s start as a leader", raftServer.Name())
 
 			// start as a follower in a existing cluster
 		} else {
@@ -310,7 +309,7 @@ func startRaft(securityType int) {
 							fmt.Println(err)
 							os.Exit(1)
 						}
-						debug("cannot join to cluster via machine %s %s", machine, err)
+						debugf("cannot join to cluster via machine %s %s", machine, err)
 					} else {
 						success = true
 						break
@@ -321,18 +320,18 @@ func startRaft(securityType int) {
 					break
 				}
 
-				warn("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
+				warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
 				time.Sleep(time.Second * RETRYINTERVAL)
 			}
 			if err != nil {
-				fatal("Cannot join the cluster via given machines after %x retries", retryTimes)
+				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
 			}
-			debug("%s success join to the cluster", raftServer.Name())
+			debugf("%s success join to the cluster", raftServer.Name())
 		}
 
 	} else {
 		// rejoin the previous cluster
-		debug("%s restart as a follower", raftServer.Name())
+		debugf("%s restart as a follower", raftServer.Name())
 	}
 
 	// open the snapshot
@@ -368,7 +367,7 @@ func createTransporter(st int) transporter {
 		tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
 
 		if err != nil {
-			fatal(fmt.Sprintln(err))
+			fatal(err)
 		}
 
 		tr := &http.Transport{
@@ -407,11 +406,11 @@ func startRaftTransport(port int, st int) {
 
 	case HTTP:
 		fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port)
-		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
+		fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 	case HTTPS:
 		fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
-		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
+		fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 
 	case HTTPSANDVERIFY:
 
@@ -423,7 +422,7 @@ func startRaftTransport(port int, st int) {
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
-		log.Fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile), nil)
+		fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile))
 	}
 
 }
@@ -440,7 +439,7 @@ func startClientTransport(port int, st int) {
 
 	case HTTP:
 		fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort)
-		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
+		fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 	case HTTPS:
 		fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
@@ -456,7 +455,7 @@ func startClientTransport(port int, st int) {
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
-		log.Fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile), nil)
+		fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile))
 	}
 }
 
@@ -529,10 +528,10 @@ func getInfo(path string) *Info {
 
 	if file, err := os.Open(infoPath); err == nil {
 		if content, err := ioutil.ReadAll(file); err != nil {
-			fatal("Unable to read info: %v", err)
+			fatalf("Unable to read info: %v", err)
 		} else {
 			if err = json.Unmarshal(content, &info); err != nil {
-				fatal("Unable to parse info: %v", err)
+				fatalf("Unable to parse info: %v", err)
 			}
 		}
 		file.Close()
@@ -564,7 +563,7 @@ func getInfo(path string) *Info {
 		content, _ := json.Marshal(info)
 		content = []byte(string(content) + "\n")
 		if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
-			fatal("Unable to write info to file: %v", err)
+			fatalf("Unable to write info to file: %v", err)
 		}
 	}
 
@@ -580,7 +579,7 @@ func createCertPool(CAFile string) *x509.CertPool {
 	cert, err := x509.ParseCertificate(block.Bytes)
 
 	if err != nil {
-		fatal(fmt.Sprintln(err))
+		fatal(err)
 	}
 
 	certPool := x509.NewCertPool()
@@ -609,7 +608,7 @@ func joinCluster(s *raft.Server, serverName string) error {
 		panic("wrong type")
 	}
 
-	debug("Send Join Request to %s", serverName)
+	debugf("Send Join Request to %s", serverName)
 
 	resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
 
@@ -624,8 +623,8 @@ func joinCluster(s *raft.Server, serverName string) error {
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
 				address := resp.Header.Get("Location")
-				debug("Leader is %s", address)
-				debug("Send Join Request to %s", address)
+				debugf("Leader is %s", address)
+				debugf("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 			} else if resp.StatusCode == http.StatusBadRequest {

+ 12 - 12
raft_handlers.go

@@ -13,7 +13,7 @@ import (
 
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name())
+	debugf("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name())
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(raftServer.LogEntries())
@@ -24,14 +24,14 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
-		debug("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName)
+		debugf("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName)
 		if resp := raftServer.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
 		}
 	}
-	warn("[vote] ERROR: %v", err)
+	warnf("[vote] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 
@@ -41,17 +41,17 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 
 	if err == nil {
-		debug("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries))
+		debugf("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries))
 		if resp := raftServer.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
-				debug("[Append Entry] Step back")
+				debugf("[Append Entry] Step back")
 			}
 			return
 		}
 	}
-	warn("[Append Entry] ERROR: %v", err)
+	warnf("[Append Entry] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 
@@ -60,14 +60,14 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debug("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name())
+		debugf("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name())
 		if resp := raftServer.RequestSnapshot(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
 		}
 	}
-	warn("[Snapshot] ERROR: %v", err)
+	warnf("[Snapshot] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 
@@ -76,20 +76,20 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRecoveryRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debug("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name())
+		debugf("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name())
 		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
 		}
 	}
-	warn("[Snapshot] ERROR: %v", err)
+	warnf("[Snapshot] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Get the port that listening for client connecting of the server
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
+	debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
 	w.WriteHeader(http.StatusOK)
 	client := hostname + ":" + strconv.Itoa(clientPort)
 	w.Write([]byte(client))
@@ -101,7 +101,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 	command := &JoinCommand{}
 
 	if err := decodeJsonRequest(req, command); err == nil {
-		debug("Receive Join Request from %s", command.Name)
+		debugf("Receive Join Request from %s", command.Name)
 		dispatch(command, &w, req, false)
 	} else {
 		w.WriteHeader(http.StatusInternalServerError)

+ 1 - 1
store/keywords.go

@@ -8,7 +8,7 @@ import (
 // keywords for internal useage
 // Key for string keyword; Value for only checking prefix
 var keywords = map[string]bool{
-	"/_etcd":       true,
+	"/_etcd":          true,
 	"/ephemeralNodes": true,
 }
 

+ 6 - 6
transporter.go

@@ -23,12 +23,12 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debug("Send LogEntries to %s ", peer.Name())
+	debugf("Send LogEntries to %s ", peer.Name())
 
 	resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
 
 	if err != nil {
-		debug("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err)
+		debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err)
 	}
 
 	if resp != nil {
@@ -48,12 +48,12 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debug("Send Vote to %s", peer.Name())
+	debugf("Send Vote to %s", peer.Name())
 
 	resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
 
 	if err != nil {
-		debug("Cannot send VoteRequest to %s : %s", peer.Name(), err)
+		debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err)
 	}
 
 	if resp != nil {
@@ -73,7 +73,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
 		req.LastTerm, req.LastIndex)
 
 	resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
@@ -95,7 +95,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debug("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
 		req.LastTerm, req.LastIndex)
 
 	resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)

+ 19 - 4
util.go

@@ -31,7 +31,7 @@ func webHelper() {
 func decodeJsonRequest(req *http.Request, data interface{}) error {
 	decoder := json.NewDecoder(req.Body)
 	if err := decoder.Decode(&data); err != nil && err != io.EOF {
-		warn("Malformed json request: %v", err)
+		warnf("Malformed json request: %v", err)
 		return fmt.Errorf("Malformed json request: %v", err)
 	}
 	return nil
@@ -57,17 +57,32 @@ func init() {
 	logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds)
 }
 
-func debug(msg string, v ...interface{}) {
+func debugf(msg string, v ...interface{}) {
 	if verbose {
 		logger.Printf("DEBUG "+msg+"\n", v...)
 	}
 }
 
-func warn(msg string, v ...interface{}) {
+func debug(v ...interface{}) {
+	if verbose {
+		logger.Println("DEBUG " + fmt.Sprint(v...))
+	}
+}
+
+func warnf(msg string, v ...interface{}) {
 	logger.Printf("WARN  "+msg+"\n", v...)
 }
 
-func fatal(msg string, v ...interface{}) {
+func warn(v ...interface{}) {
+	logger.Println("WARN " + fmt.Sprint(v...))
+}
+
+func fatalf(msg string, v ...interface{}) {
 	logger.Printf("FATAL "+msg+"\n", v...)
 	os.Exit(1)
 }
+
+func fatal(v ...interface{}) {
+	logger.Println("FATAL " + fmt.Sprint(v...))
+	os.Exit(1)
+}