Browse Source

Merge pull request #86 from xiangli-cmu/master

Change snapshot to clientside
Xiang Li 12 years ago
parent
commit
21c658b151
11 changed files with 114 additions and 95 deletions
  1. 27 24
      etcd.go
  2. 10 29
      etcd_handlers.go
  3. 2 2
      etcd_long_test.go
  4. 0 19
      machines.go
  5. 7 7
      raft_handlers.go
  6. 36 0
      snapshot.go
  7. 9 1
      store/stats.go
  8. 1 1
      test.go
  9. 1 10
      third_party/github.com/coreos/go-raft/server.go
  10. 0 2
      transporter.go
  11. 21 0
      util.go

+ 27 - 24
etcd.go

@@ -87,14 +87,14 @@ func init() {
 }
 
 const (
-	ELECTIONTIMEOUT  = 200 * time.Millisecond
-	HEARTBEATTIMEOUT = 50 * time.Millisecond
+	ElectionTimeout  = 200 * time.Millisecond
+	HeartbeatTimeout = 50 * time.Millisecond
 
 	// Timeout for internal raft http connection
 	// The original timeout for http is 45 seconds
 	// which is too long for our usage.
-	HTTPTIMEOUT   = 10 * time.Second
-	RETRYINTERVAL = 10
+	HTTPTimeout   = 10 * time.Second
+	RetryInterval = 10
 )
 
 //------------------------------------------------------------------------------
@@ -120,6 +120,12 @@ type Info struct {
 	EtcdTLS TLSInfo `json:"etcdTLS"`
 }
 
+type TLSConfig struct {
+	Scheme string
+	Server tls.Config
+	Client tls.Config
+}
+
 //------------------------------------------------------------------------------
 //
 // Variables
@@ -234,6 +240,7 @@ func main() {
 
 	// Create etcd key-value store
 	etcdStore = store.CreateStore(maxSize)
+	snapConf = newSnapshotConf()
 
 	startRaft(raftTLSConfig)
 
@@ -275,8 +282,8 @@ func startRaft(tlsConfig TLSConfig) {
 		}
 	}
 
-	raftServer.SetElectionTimeout(ELECTIONTIMEOUT)
-	raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
+	raftServer.SetElectionTimeout(ElectionTimeout)
+	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
 
 	raftServer.Start()
 
@@ -313,7 +320,7 @@ func startRaft(tlsConfig TLSConfig) {
 					if len(machine) == 0 {
 						continue
 					}
-					err = joinCluster(raftServer, machine)
+					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
 					if err != nil {
 						if err.Error() == errors[103] {
 							fmt.Println(err)
@@ -330,8 +337,8 @@ func startRaft(tlsConfig TLSConfig) {
 					break
 				}
 
-				warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
-				time.Sleep(time.Second * RETRYINTERVAL)
+				warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
+				time.Sleep(time.Second * RetryInterval)
 			}
 			if err != nil {
 				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
@@ -346,7 +353,7 @@ func startRaft(tlsConfig TLSConfig) {
 
 	// open the snapshot
 	if snapshot {
-		go raftServer.Snapshot()
+		go monitorSnapshot()
 	}
 
 	// start to response to raft requests
@@ -360,10 +367,8 @@ func startRaft(tlsConfig TLSConfig) {
 func newTransporter(scheme string, tlsConf tls.Config) transporter {
 	t := transporter{}
 
-	t.scheme = scheme
-
 	tr := &http.Transport{
-		Dial:               dialTimeout,
+		Dial: dialTimeout,
 	}
 
 	if scheme == "https" {
@@ -378,7 +383,7 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter {
 
 // Dial with timeout
 func dialTimeout(network, addr string) (net.Conn, error) {
-	return net.DialTimeout(network, addr, HTTPTIMEOUT)
+	return net.DialTimeout(network, addr, HTTPTimeout)
 }
 
 // Start to listen and response raft command
@@ -445,12 +450,6 @@ func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
 // Config
 //--------------------------------------
 
-type TLSConfig struct {
-	Scheme string
-	Server tls.Config
-	Client tls.Config
-}
-
 func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
 	var keyFile, certFile, CAFile string
 	var tlsCert tls.Certificate
@@ -551,7 +550,11 @@ func getInfo(path string) *Info {
 	return info
 }
 
-// Create client auth certpool
+// newCertPool creates x509 certPool and corresponding Auth Type.
+// If the given CAfile is valid, add the cert into the pool and verify the clients'
+// certs against the cert in the pool.
+// If the given CAfile is empty, do not verify the clients' cert.
+// If the given CAfile is not valid, fatal.
 func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
 	if CAFile == "" {
 		return tls.NoClientCert, nil
@@ -574,7 +577,7 @@ func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
 }
 
 // Send join requests to the leader.
-func joinCluster(s *raft.Server, raftURL string) error {
+func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 	var b bytes.Buffer
 
 	command := &JoinCommand{
@@ -592,10 +595,10 @@ func joinCluster(s *raft.Server, raftURL string) error {
 		panic("wrong type")
 	}
 
-	joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
+	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
 
 	debugf("Send Join Request to %s", raftURL)
-	
+
 	resp, err := t.Post(joinURL.String(), &b)
 
 	for {

+ 10 - 29
etcd_handlers.go

@@ -3,9 +3,9 @@ package main
 import (
 	"fmt"
 	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
 	"net/http"
 	"strconv"
-	"time"
 )
 
 //-------------------------------------------------------------------
@@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] POST %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
 
 	value := req.FormValue("value")
 
@@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] DELETE %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
 
 	command := &DeleteCommand{
 		Key: key,
@@ -107,8 +107,9 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 
 // Dispatch the command to leader
 func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
-	if raftServer.State() == "leader" {
+	if raftServer.State() == raft.Leader {
 		if body, err := raftServer.Do(c); err != nil {
+
 			if _, ok := err.(store.NotFoundError); ok {
 				(*w).WriteHeader(http.StatusNotFound)
 				(*w).Write(newJsonError(100, err.Error()))
@@ -162,12 +163,6 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
 		path := req.URL.Path
 
-		var scheme string
-
-		if scheme = req.URL.Scheme; scheme == "" {
-			scheme = "http://"
-		}
-
 		var url string
 
 		if etcd {
@@ -217,14 +212,14 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 	// Add itself to the machine list first
 	// Since peer map does not contain the server itself
-	machines, _ := getEtcdURL(raftServer.Name())
+	machines := info.EtcdURL
 
 	// Add all peers to the list and separate by comma
 	// We do not use json here since we accept machines list
 	// in the command line separate by comma.
 
 	for peerName, _ := range peers {
-		if addr, ok := getEtcdURL(peerName); ok {
+		if addr, ok := nameToEtcdURL(peerName); ok {
 			machines = machines + "," + addr
 		}
 	}
@@ -250,7 +245,7 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] GET %s/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
 
 	command := &GetCommand{
 		Key: key,
@@ -289,13 +284,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 
 	if req.Method == "GET" {
-		debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
+		debugf("[recv] GET %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
 		command.SinceIndex = 0
 
 	} else if req.Method == "POST" {
 		// watch from a specific index
 
-		debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
+		debugf("[recv] POST %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
 		content := req.FormValue("index")
 
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@@ -339,17 +334,3 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 	w.WriteHeader(http.StatusBadRequest)
 }
-
-// Convert string duration to time format
-func durationToExpireTime(strDuration string) (time.Time, error) {
-	if strDuration != "" {
-		duration, err := strconv.Atoi(strDuration)
-
-		if err != nil {
-			return time.Unix(0, 0), err
-		}
-		return time.Now().Add(time.Second * (time.Duration)(duration)), nil
-	} else {
-		return time.Unix(0, 0), nil
-	}
-}

+ 2 - 2
etcd_long_test.go

@@ -57,8 +57,8 @@ func TestKillLeader(t *testing.T) {
 		totalTime += take
 		avgTime := totalTime / (time.Duration)(i+1)
 
-		fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMEOUT)
-		fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMEOUT)
+		fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout)
+		fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
 		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
 	}
 }

+ 0 - 19
machines.go

@@ -1,24 +1,5 @@
 package main
 
-import (
-	"net/url"
-	"path"
-)
-
-func getEtcdURL(name string) (string, bool) {
-	resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
-
-	m, err := url.ParseQuery(resps[0].Value)
-
-	if err != nil {
-		panic("Failed to parse machines entry")
-	}
-
-	addr := m["etcd"][0]
-
-	return addr, true
-}
-
 // machineNum returns the number of machines in the cluster
 func machineNum() int {
 	response, _ := etcdStore.RawGet("_etcd/machines")

+ 7 - 7
raft_handlers.go

@@ -12,7 +12,7 @@ import (
 
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name())
+	debugf("[recv] GET %s/log", info.RaftURL)
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(raftServer.LogEntries())
@@ -23,7 +23,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
-		debugf("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName)
+		debugf("[recv] POST %s/vote [%s]", info.RaftURL, rvreq.CandidateName)
 		if resp := raftServer.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
@@ -40,7 +40,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 
 	if err == nil {
-		debugf("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries))
+		debugf("[recv] POST %s/log/append [%d]", info.RaftURL, len(aereq.Entries))
 		if resp := raftServer.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
@@ -59,7 +59,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name())
+		debugf("[recv] POST %s/snapshot/ ", info.RaftURL)
 		if resp := raftServer.RequestSnapshot(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
@@ -75,7 +75,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRecoveryRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name())
+		debugf("[recv] POST %s/snapshotRecovery/ ", info.RaftURL)
 		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
@@ -88,7 +88,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Get the port that listening for etcd connecting of the server
 func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name())
+	debugf("[recv] Get %s/etcdURL/ ", info.RaftURL)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(argInfo.EtcdURL))
 }
@@ -109,7 +109,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Response to the name request
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name())
+	debugf("[recv] Get %s/name/ ", info.RaftURL)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(raftServer.Name()))
 }

+ 36 - 0
snapshot.go

@@ -0,0 +1,36 @@
+package main
+
+import (
+	"time"
+)
+
+// basic conf.
+// TODO: find a good policy to do snapshot
+type snapshotConf struct {
+	// Etcd will check if snapshot is need every checkingInterval
+	checkingInterval time.Duration
+	// The number of writes when the last snapshot happened
+	lastWrites uint64
+	// If the incremental number of writes since the last snapshot
+	// exceeds the write Threshold, etcd will do a snapshot
+	writesThr uint64
+}
+
+var snapConf *snapshotConf
+
+func newSnapshotConf() *snapshotConf {
+	// check snapshot every 3 seconds and the threshold is 20K
+	return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000}
+}
+
+func monitorSnapshot() {
+	for {
+		time.Sleep(snapConf.checkingInterval)
+		currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
+
+		if currentWrites > snapConf.writesThr {
+			raftServer.TakeSnapshot()
+			snapConf.lastWrites = etcdStore.TotalWrites()
+		}
+	}
+}

+ 9 - 1
store/stats.go

@@ -18,8 +18,16 @@ type EtcdStats struct {
 	TestAndSets uint64 `json:"testAndSets"`
 }
 
-// Stats returns the basic statistics information of etcd storage
+// Stats returns the basic statistics information of etcd storage since its recent start
 func (s *Store) Stats() []byte {
 	b, _ := json.Marshal(s.BasicStats)
 	return b
 }
+
+// TotalWrites returns the total write operations
+// It helps with snapshot
+func (s *Store) TotalWrites() uint64 {
+	bs := s.BasicStats
+
+	return bs.Deletes + bs.Sets + bs.TestAndSets
+}

+ 1 - 1
test.go

@@ -70,7 +70,7 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 
 	for i := 0; i < size; i++ {
 		if i == 0 {
-			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"}
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
 			if ssl {
 				argGroup[i] = append(argGroup[i], sslServer1...)
 			}

+ 1 - 10
third_party/github.com/coreos/go-raft/server.go

@@ -1025,16 +1025,7 @@ func (s *Server) RemovePeer(name string) error {
 // Log compaction
 //--------------------------------------
 
-// The background snapshot function
-func (s *Server) Snapshot() {
-	for {
-		// TODO: change this... to something reasonable
-		time.Sleep(1 * time.Second)
-		s.takeSnapshot()
-	}
-}
-
-func (s *Server) takeSnapshot() error {
+func (s *Server) TakeSnapshot() error {
 	//TODO put a snapshot mutex
 	s.debugln("take Snapshot")
 	if s.currentSnapshot != nil {

+ 0 - 2
transporter.go

@@ -12,8 +12,6 @@ import (
 // Transporter layer for communication between raft nodes
 type transporter struct {
 	client *http.Client
-	// scheme
-	scheme string
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.

+ 21 - 0
util.go

@@ -8,8 +8,29 @@ import (
 	"log"
 	"net/http"
 	"os"
+	"strconv"
+	"time"
 )
 
+//--------------------------------------
+// etcd http Helper
+//--------------------------------------
+
+// Convert string duration to time format
+func durationToExpireTime(strDuration string) (time.Time, error) {
+	if strDuration != "" {
+		duration, err := strconv.Atoi(strDuration)
+
+		if err != nil {
+			return time.Unix(0, 0), err
+		}
+		return time.Now().Add(time.Second * (time.Duration)(duration)), nil
+
+	} else {
+		return time.Unix(0, 0), nil
+	}
+}
+
 //--------------------------------------
 // Web Helper
 //--------------------------------------