Browse Source

server: use /v2/admin/machines/ http endpoint to join

Yicheng Qin 11 years ago
parent
commit
5af8fe9a84
3 changed files with 212 additions and 18 deletions
  1. 21 14
      etcd/etcd.go
  2. 14 4
      etcd/etcd_test.go
  3. 177 0
      etcd/v2_client.go

+ 21 - 14
etcd/etcd.go

@@ -57,6 +57,7 @@ type Server struct {
 	addNodeC    chan raft.Config
 	removeNodeC chan raft.Config
 	t           *transporter
+	client      *v2client
 
 	store.Store
 
@@ -96,6 +97,7 @@ func New(c *config.Config, id int64) *Server {
 		addNodeC:    make(chan raft.Config),
 		removeNodeC: make(chan raft.Config),
 		t:           newTransporter(tc),
+		client:      newClient(tc),
 
 		Store: store.New(),
 
@@ -159,24 +161,29 @@ func (s *Server) Bootstrap() {
 
 func (s *Server) Join() {
 	log.Println("joining cluster via peers", s.config.Peers)
-	d, err := json.Marshal(&raft.Config{s.id, s.raftPubAddr, []byte(s.pubAddr)})
-	if err != nil {
-		panic(err)
+	info := &context{
+		MinVersion: store.MinVersion(),
+		MaxVersion: store.MaxVersion(),
+		ClientURL:  s.pubAddr,
+		PeerURL:    s.raftPubAddr,
 	}
 
-	b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}})
-	if err != nil {
-		panic(err)
-	}
-
-	for seed := range s.nodes {
-		if err := s.t.send(seed+raftPrefix, b); err != nil {
-			log.Println(err)
-			continue
+	succeed := false
+	for i := 0; i < 5; i++ {
+		for seed := range s.nodes {
+			if err := s.client.AddMachine(seed, fmt.Sprint(s.id), info); err == nil {
+				succeed = true
+				break
+			} else {
+				log.Println(err)
+			}
 		}
-		// todo(xiangli) WAIT for join to be committed or retry...
-		break
+		if succeed {
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
 	}
+
 	s.run()
 }
 

+ 14 - 4
etcd/etcd_test.go

@@ -259,15 +259,25 @@ func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptes
 func waitCluster(t *testing.T, es []*Server) {
 	n := len(es)
 	for i, e := range es {
-		for k := 1; k < n+1; k++ {
-			w, err := e.Watch(v2machineKVPrefix, true, false, uint64(k))
+		var index uint64
+		for k := 0; k < n; k++ {
+			index++
+			w, err := e.Watch(v2machineKVPrefix, true, false, index)
 			if err != nil {
 				panic(err)
 			}
 			v := <-w.EventChan
-			ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, k-1)
+			// join command may appear several times due to retry
+			// when timeout
+			if k > 0 {
+				pw := fmt.Sprintf("%s/%d", v2machineKVPrefix, k-1)
+				if v.Node.Key == pw {
+					continue
+				}
+			}
+			ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, k)
 			if v.Node.Key != ww {
-				t.Errorf("#%d path = %v, want %v", i, v.Node.Key, w)
+				t.Errorf("#%d path = %v, want %v", i, v.Node.Key, ww)
 			}
 		}
 	}

+ 177 - 0
etcd/v2_client.go

@@ -0,0 +1,177 @@
+package etcd
+
+import (
+	"bytes"
+	"crypto/tls"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"strconv"
+
+	"github.com/coreos/etcd/config"
+	etcdErr "github.com/coreos/etcd/error"
+)
+
+// v2client sends various requests using HTTP API.
+// It is different from raft communication, and doesn't record anything in the log.
+// The argument url is required to contain scheme and host only, and
+// there is no trailing slash in it.
+// Public functions return "etcd/error".Error intentionally to figure out
+// etcd error code easily.
+type v2client struct {
+	http.Client
+}
+
+func newClient(tc *tls.Config) *v2client {
+	tr := new(http.Transport)
+	tr.TLSClientConfig = tc
+	return &v2client{http.Client{Transport: tr}}
+}
+
+// CheckVersion returns true when the version check on the server returns 200.
+func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
+	resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
+	if err != nil {
+		return false, clientError(err)
+	}
+
+	defer resp.Body.Close()
+
+	return resp.StatusCode == 200, nil
+}
+
+// GetVersion fetches the peer version of a cluster.
+func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) {
+	resp, err := c.Get(url + "/version")
+	if err != nil {
+		return 0, clientError(err)
+	}
+
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return 0, clientError(err)
+	}
+
+	// Parse version number.
+	version, err := strconv.Atoi(string(body))
+	if err != nil {
+		return 0, clientError(err)
+	}
+	return version, nil
+}
+
+func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/machines/")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	msgs := new([]*machineMessage)
+	if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
+		return nil, uerr
+	}
+	return *msgs, nil
+}
+
+func (c *v2client) GetClusterConfig(url string) (*config.ClusterConfig, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/config")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	config := new(config.ClusterConfig)
+	if uerr := c.parseJSONResponse(resp, config); uerr != nil {
+		return nil, uerr
+	}
+	return config, nil
+}
+
+// AddMachine adds machine to the cluster.
+// The first return value is the commit index of join command.
+func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.Error {
+	b, _ := json.Marshal(info)
+	url = url + "/v2/admin/machines/" + name
+
+	log.Printf("Send Join Request to %s", url)
+	resp, err := c.put(url, b)
+	if err != nil {
+		return clientError(err)
+	}
+	defer resp.Body.Close()
+
+	if err := c.checkErrorResponse(resp); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (c *v2client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
+	defer resp.Body.Close()
+
+	if err := c.checkErrorResponse(resp); err != nil {
+		return err
+	}
+	if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
+		log.Printf("Error parsing join response: %v", err)
+		return clientError(err)
+	}
+	return nil
+}
+
+func (c *v2client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
+	if resp.StatusCode != http.StatusOK {
+		uerr := &etcdErr.Error{}
+		if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
+			log.Printf("Error parsing response to etcd error: %v", err)
+			return clientError(err)
+		}
+		return uerr
+	}
+	return nil
+}
+
+// put sends server side PUT request.
+// It always follows redirects instead of stopping according to RFC 2616.
+func (c *v2client) put(urlStr string, body []byte) (*http.Response, error) {
+	return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
+}
+
+func (c *v2client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
+	var req *http.Request
+
+	for redirect := 0; redirect < 10; redirect++ {
+		req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
+		if err != nil {
+			return
+		}
+
+		if resp, err = c.Do(req); err != nil {
+			if resp != nil {
+				resp.Body.Close()
+			}
+			return
+		}
+
+		if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
+			resp.Body.Close()
+			if urlStr = resp.Header.Get("Location"); urlStr == "" {
+				err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
+				return
+			}
+			continue
+		}
+		return
+	}
+
+	err = errors.New("stopped after 10 redirects")
+	return
+}
+
+func clientError(err error) *etcdErr.Error {
+	return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
+}