Browse Source

Merge pull request #774 from unihorn/83

feat(join): check cluster conditions before join
Yicheng Qin 11 years ago
parent
commit
f1c13e2d9d
5 changed files with 266 additions and 94 deletions
  1. 6 1
      error/error.go
  2. 31 9
      etcd/etcd.go
  3. 184 0
      server/client.go
  4. 42 83
      server/peer_server.go
  5. 3 1
      server/peer_server_handlers.go

+ 6 - 1
error/error.go

@@ -58,6 +58,9 @@ var errors = map[int]string{
 	EcodeInvalidActiveSize:   "Invalid active size",
 	EcodeInvalidPromoteDelay: "Standby promote delay",
 	EcodePromoteError:        "Standby promotion error",
+
+	// client related errors
+	EcodeClientInternal: "Client Internal Error",
 }
 
 const (
@@ -92,6 +95,8 @@ const (
 	EcodeInvalidActiveSize   = 403
 	EcodeInvalidPromoteDelay = 404
 	EcodePromoteError        = 405
+
+	EcodeClientInternal = 500
 )
 
 type Error struct {
@@ -116,7 +121,7 @@ func Message(code int) string {
 
 // Only for error interface
 func (e Error) Error() string {
-	return e.Message
+	return e.Message + " (" + e.Cause + ")"
 }
 
 func (e Error) toJsonString() string {

+ 31 - 9
etcd/etcd.go

@@ -28,6 +28,7 @@ import (
 	goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
 	golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log"
 	"github.com/coreos/etcd/third_party/github.com/goraft/raft"
+	httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
 
 	"github.com/coreos/etcd/config"
 	ehttp "github.com/coreos/etcd/http"
@@ -37,6 +38,14 @@ import (
 	"github.com/coreos/etcd/store"
 )
 
+// TODO(yichengq): constant extraTimeout is a hack.
+// Current problem is that there is big lag between join command
+// execution and join success.
+// Fix it later. It should be removed when proper method is found and
+// enough tests are provided. It is expected to be calculated from
+// heartbeatInterval and electionTimeout only.
+const extraTimeout = time.Duration(1000) * time.Millisecond
+
 type Etcd struct {
 	Config       *config.Config     // etcd config
 	Store        store.Store        // data store
@@ -144,14 +153,27 @@ func (e *Etcd) Run() {
 	// Calculate all of our timeouts
 	heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
 	electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
-	// TODO(yichengq): constant 1000 is a hack here. The reason to use this
-	// is to ensure etcd instances could start successfully at the same time.
-	// Current problem for the failure comes from the lag between join command
-	// execution and join success.
-	// Fix it later. It should be removed when proper method is found and
-	// enough tests are provided.
-	dialTimeout := (3 * heartbeatInterval) + electionTimeout + 1000
-	responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + 1000
+	dialTimeout := (3 * heartbeatInterval) + electionTimeout
+	responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout
+
+	clientTransporter := &httpclient.Transport{
+		ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout,
+		// This is a workaround for Transport.CancelRequest doesn't work on
+		// HTTPS connections blocked. The patch for it is in progress,
+		// and would be available in Go1.3
+		// More: https://codereview.appspot.com/69280043/
+		ConnectTimeout: dialTimeout + extraTimeout,
+		RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout,
+	}
+	if e.Config.PeerTLSInfo().Scheme() == "https" {
+		clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
+		if err != nil {
+			log.Fatal("client TLS error: ", err)
+		}
+		clientTransporter.TLSClientConfig = clientTLSConfig
+		clientTransporter.DisableCompression = true
+	}
+	client := server.NewClient(clientTransporter)
 
 	// Create peer server
 	psConfig := server.PeerServerConfig{
@@ -162,7 +184,7 @@ func (e *Etcd) Run() {
 		RetryTimes:    e.Config.MaxRetryAttempts,
 		RetryInterval: e.Config.RetryInterval,
 	}
-	e.PeerServer = server.NewPeerServer(psConfig, e.Registry, e.Store, &mb, followersStats, serverStats)
+	e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats)
 
 	// Create raft transporter and server
 	raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout)

+ 184 - 0
server/client.go

@@ -0,0 +1,184 @@
+package server
+
+import (
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strconv"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/log"
+)
+
+// Client sends various requests using HTTP API.
+// It is different from raft communication, and doesn't record anything in the log.
+// The argument url is required to contain scheme and host only, and
+// there is no trailing slash in it.
+// Public functions return "etcd/error".Error intentionally to figure out
+// etcd error code easily.
+// TODO(yichengq): It is similar to go-etcd. But it could have many efforts
+// to integrate the two. Leave it for further discussion.
+type Client struct {
+	http.Client
+}
+
+func NewClient(transport http.RoundTripper) *Client {
+	return &Client{http.Client{Transport: transport}}
+}
+
+// CheckVersion checks whether the version is available.
+func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
+	resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
+	if err != nil {
+		return false, clientError(err)
+	}
+
+	defer resp.Body.Close()
+
+	return resp.StatusCode == 200, nil
+}
+
+// GetVersion fetches the peer version of a cluster.
+func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
+	resp, err := c.Get(url + "/version")
+	if err != nil {
+		return 0, clientError(err)
+	}
+
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return 0, clientError(err)
+	}
+
+	// Parse version number.
+	version, err := strconv.Atoi(string(body))
+	if err != nil {
+		return 0, clientError(err)
+	}
+	return version, nil
+}
+
+func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/machines")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	msgs := new([]*machineMessage)
+	if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
+		return nil, uerr
+	}
+	return *msgs, nil
+}
+
+func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/config")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	config := new(ClusterConfig)
+	if uerr := c.parseJSONResponse(resp, config); uerr != nil {
+		return nil, uerr
+	}
+	return config, nil
+}
+
+// AddMachine adds machine to the cluster.
+// The first return value is the commit index of join command.
+func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {
+	b, _ := json.Marshal(cmd)
+	url = url + "/join"
+
+	log.Infof("Send Join Request to %s", url)
+	resp, err := c.put(url, b)
+	if err != nil {
+		return 0, clientError(err)
+	}
+	defer resp.Body.Close()
+
+	if err := c.checkErrorResponse(resp); err != nil {
+		return 0, err
+	}
+	b, err = ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return 0, clientError(err)
+	}
+	index, numRead := binary.Uvarint(b)
+	if numRead < 0 {
+		return 0, clientError(fmt.Errorf("buf too small, or value too large"))
+	}
+	return index, nil
+}
+
+func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
+	defer resp.Body.Close()
+
+	if err := c.checkErrorResponse(resp); err != nil {
+		return err
+	}
+	if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
+		log.Debugf("Error parsing join response: %v", err)
+		return clientError(err)
+	}
+	return nil
+}
+
+func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
+	if resp.StatusCode != http.StatusOK {
+		uerr := &etcdErr.Error{}
+		if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
+			log.Debugf("Error parsing response to etcd error: %v", err)
+			return clientError(err)
+		}
+		return uerr
+	}
+	return nil
+}
+
+// put sends server side PUT request.
+// It always follows redirects instead of stopping according to RFC 2616.
+func (c *Client) put(urlStr string, body []byte) (*http.Response, error) {
+	return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
+}
+
+func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
+	var req *http.Request
+
+	for redirect := 0; redirect < 10; redirect++ {
+		req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
+		if err != nil {
+			return
+		}
+
+		if resp, err = c.Do(req); err != nil {
+			if resp != nil {
+				resp.Body.Close()
+			}
+			return
+		}
+
+		if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
+			resp.Body.Close()
+			if urlStr = resp.Header.Get("Location"); urlStr == "" {
+				err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
+				return
+			}
+			continue
+		}
+		return
+	}
+
+	err = errors.New("stopped after 10 redirects")
+	return
+}
+
+func clientError(err error) *etcdErr.Error {
+	return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
+}

+ 42 - 83
server/peer_server.go

@@ -1,16 +1,12 @@
 package server
 
 import (
-	"bytes"
-	"encoding/binary"
 	"encoding/json"
 	"fmt"
-	"io/ioutil"
 	"math/rand"
 	"net/http"
 	"net/url"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -52,6 +48,7 @@ type PeerServerConfig struct {
 
 type PeerServer struct {
 	Config         PeerServerConfig
+	client         *Client
 	clusterConfig  *ClusterConfig
 	raftServer     raft.Server
 	server         *Server
@@ -86,9 +83,10 @@ type snapshotConf struct {
 	snapshotThr uint64
 }
 
-func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
+func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
 	s := &PeerServer{
 		Config:         psConfig,
+		client:         client,
 		clusterConfig:  NewClusterConfig(),
 		registry:       registry,
 		store:          store,
@@ -410,24 +408,6 @@ func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
 	return nil
 }
 
-// getVersion fetches the peer version of a cluster.
-func getVersion(t *transporter, versionURL url.URL) (int, error) {
-	resp, _, err := t.Get(versionURL.String())
-	if err != nil {
-		return 0, err
-	}
-	defer resp.Body.Close()
-
-	body, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return 0, err
-	}
-
-	// Parse version number.
-	version, _ := strconv.Atoi(string(body))
-	return version, nil
-}
-
 // Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
 func (s *PeerServer) Upgradable() error {
 	nextVersion := s.store.Version() + 1
@@ -437,13 +417,12 @@ func (s *PeerServer) Upgradable() error {
 			return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
 		}
 
-		t, _ := s.raftServer.Transporter().(*transporter)
-		checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String()
-		resp, _, err := t.Get(checkURL)
+		url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String()
+		ok, err := s.client.CheckVersion(url, nextVersion)
 		if err != nil {
-			return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host)
+			return err
 		}
-		if resp.StatusCode != 200 {
+		if !ok {
 			return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host)
 		}
 	}
@@ -552,73 +531,53 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
 
 // Send join requests to peer.
 func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
-	// t must be ok
-	t, _ := server.Transporter().(*transporter)
+	u := (&url.URL{Host: peer, Scheme: scheme}).String()
 
 	// Our version must match the leaders version
-	versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"}
-	version, err := getVersion(t, versionURL)
+	version, err := s.client.GetVersion(u)
 	if err != nil {
-		return fmt.Errorf("Error during join version check: %v", err)
+		return fmt.Errorf("fail checking join version: %v", err)
 	}
 	if version < store.MinVersion() || version > store.MaxVersion() {
-		return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
+		return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
 	}
 
-	var b bytes.Buffer
-	c := &JoinCommand{
-		MinVersion: store.MinVersion(),
-		MaxVersion: store.MaxVersion(),
-		Name:       server.Name(),
-		RaftURL:    s.Config.URL,
-		EtcdURL:    s.server.URL(),
+	// Fetch current peer list
+	machines, err := s.client.GetMachines(u)
+	if err != nil {
+		return fmt.Errorf("fail getting machine messages: %v", err)
 	}
-	json.NewEncoder(&b).Encode(c)
-
-	joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
-	log.Infof("Send Join Request to %s", joinURL.String())
-
-	req, _ := http.NewRequest("PUT", joinURL.String(), &b)
-	resp, err := t.client.Do(req)
-
-	for {
-		if err != nil {
-			return fmt.Errorf("Unable to join: %v", err)
+	exist := false
+	for _, machine := range machines {
+		if machine.Name == server.Name() {
+			exist = true
+			break
 		}
-		if resp != nil {
-			defer resp.Body.Close()
+	}
 
-			log.Infof("»»»» %d", resp.StatusCode)
-			if resp.StatusCode == http.StatusOK {
-				b, _ := ioutil.ReadAll(resp.Body)
-				s.joinIndex, _ = binary.Uvarint(b)
-				return nil
-			}
-			if resp.StatusCode == http.StatusTemporaryRedirect {
-				address := resp.Header.Get("Location")
-				log.Debugf("Send Join Request to %s", address)
-				c := &JoinCommand{
-					MinVersion: store.MinVersion(),
-					MaxVersion: store.MaxVersion(),
-					Name:       server.Name(),
-					RaftURL:    s.Config.URL,
-					EtcdURL:    s.server.URL(),
-				}
-				json.NewEncoder(&b).Encode(c)
-				resp, _, err = t.Put(address, &b)
-
-			} else if resp.StatusCode == http.StatusBadRequest {
-				log.Debug("Reach max number peers in the cluster")
-				decoder := json.NewDecoder(resp.Body)
-				err := &etcdErr.Error{}
-				decoder.Decode(err)
-				return *err
-			} else {
-				return fmt.Errorf("Unable to join")
-			}
-		}
+	// Fetch cluster config to see whether exists some place.
+	clusterConfig, err := s.client.GetClusterConfig(u)
+	if err != nil {
+		return fmt.Errorf("fail getting cluster config: %v", err)
+	}
+	if !exist && clusterConfig.ActiveSize <= len(machines) {
+		return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
+	}
 
+	joinIndex, err := s.client.AddMachine(u,
+		&JoinCommand{
+			MinVersion: store.MinVersion(),
+			MaxVersion: store.MaxVersion(),
+			Name:       server.Name(),
+			RaftURL:    s.Config.URL,
+			EtcdURL:    s.server.URL(),
+		})
+	if err != nil {
+		return fmt.Errorf("fail on join request: %v", err)
 	}
+
+	s.joinIndex = joinIndex
+	return nil
 }
 
 func (s *PeerServer) Stats() []byte {

+ 3 - 1
server/peer_server_handlers.go

@@ -225,7 +225,9 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re
 	machines := make([]*machineMessage, 0)
 	leader := ps.raftServer.Leader()
 	for _, name := range ps.registry.Names() {
-		machines = append(machines, ps.getMachineMessage(name, leader))
+		if msg := ps.getMachineMessage(name, leader); msg != nil {
+			machines = append(machines, msg)
+		}
 	}
 	json.NewEncoder(w).Encode(&machines)
 }