Browse Source

etcdserver: forward member promote to leader

Jingyi Hu 6 years ago
parent
commit
f5eaaaf440

+ 88 - 10
etcdserver/api/etcdhttp/peer.go

@@ -16,56 +16,82 @@ package etcdhttp
 
 import (
 	"encoding/json"
+	"fmt"
 	"net/http"
+	"strconv"
+	"strings"
 
 	"go.etcd.io/etcd/etcdserver"
 	"go.etcd.io/etcd/etcdserver/api"
+	"go.etcd.io/etcd/etcdserver/api/membership"
 	"go.etcd.io/etcd/etcdserver/api/rafthttp"
 	"go.etcd.io/etcd/lease/leasehttp"
+	"go.etcd.io/etcd/pkg/types"
 
 	"go.uber.org/zap"
 )
 
 const (
-	peerMembersPrefix = "/members"
+	peerMembersPath         = "/members"
+	peerMemberPromotePrefix = "/members/promote/"
 )
 
 // NewPeerHandler generates an http.Handler to handle etcd peer requests.
 func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler {
-	return newPeerHandler(lg, s.Cluster(), s.RaftHandler(), s.LeaseHandler())
+	return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler())
 }
 
-func newPeerHandler(lg *zap.Logger, cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
-	mh := &peerMembersHandler{
-		lg:      lg,
-		cluster: cluster,
-	}
+func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
+	peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
+	peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
 
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", http.NotFound)
 	mux.Handle(rafthttp.RaftPrefix, raftHandler)
 	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
-	mux.Handle(peerMembersPrefix, mh)
+	mux.Handle(peerMembersPath, peerMembersHandler)
+	mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
 	if leaseHandler != nil {
 		mux.Handle(leasehttp.LeasePrefix, leaseHandler)
 		mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
 	}
-	mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
+	mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion))
 	return mux
 }
 
+func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
+	return &peerMembersHandler{
+		lg:      lg,
+		cluster: cluster,
+	}
+}
+
 type peerMembersHandler struct {
 	lg      *zap.Logger
 	cluster api.Cluster
 }
 
+func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
+	return &peerMemberPromoteHandler{
+		lg:      lg,
+		cluster: s.Cluster(),
+		server:  s,
+	}
+}
+
+type peerMemberPromoteHandler struct {
+	lg      *zap.Logger
+	cluster api.Cluster
+	server  etcdserver.Server
+}
+
 func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r, "GET") {
 		return
 	}
 	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
 
-	if r.URL.Path != peerMembersPrefix {
+	if r.URL.Path != peerMembersPath {
 		http.Error(w, "bad path", http.StatusBadRequest)
 		return
 	}
@@ -79,3 +105,55 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		}
 	}
 }
+
+func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if !allowMethod(w, r, "POST") {
+		return
+	}
+	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
+
+	if !strings.HasPrefix(r.URL.Path, peerMemberPromotePrefix) {
+		http.Error(w, "bad path", http.StatusBadRequest)
+		return
+	}
+	idStr := strings.TrimPrefix(r.URL.Path, peerMemberPromotePrefix)
+	id, err := strconv.ParseUint(idStr, 10, 64)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("member %s not found in cluster", idStr), http.StatusNotFound)
+		return
+	}
+
+	resp, err := h.server.PromoteMember(r.Context(), id)
+	if err != nil {
+		switch err {
+		case membership.ErrIDNotFound:
+			http.Error(w, err.Error(), http.StatusNotFound)
+		case membership.ErrMemberNotLearner:
+			http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		case membership.ErrLearnerNotReady:
+			http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		default:
+			WriteError(h.lg, w, r, err)
+		}
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to promote a member",
+				zap.String("member-id", types.ID(id).String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("error promoting member %s (%v)", types.ID(id).String(), err)
+		}
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+	if err := json.NewEncoder(w).Encode(resp); err != nil {
+		if h.lg != nil {
+			h.lg.Warn("failed to encode members response", zap.Error(err))
+		} else {
+			plog.Warningf("failed to encode members response (%v)", err)
+		}
+	}
+}

+ 131 - 9
etcdserver/api/etcdhttp/peer_test.go

@@ -15,19 +15,24 @@
 package etcdhttp
 
 import (
+	"context"
 	"encoding/json"
+	"fmt"
 	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
 	"path"
 	"sort"
+	"strings"
 	"testing"
 
 	"go.uber.org/zap"
 
 	"github.com/coreos/go-semver/semver"
+	"go.etcd.io/etcd/etcdserver/api"
 	"go.etcd.io/etcd/etcdserver/api/membership"
 	"go.etcd.io/etcd/etcdserver/api/rafthttp"
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/pkg/testutil"
 	"go.etcd.io/etcd/pkg/types"
 )
@@ -51,13 +56,34 @@ func (c *fakeCluster) Members() []*membership.Member {
 func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
 func (c *fakeCluster) Version() *semver.Version              { return nil }
 
+type fakeServer struct {
+	cluster api.Cluster
+}
+
+func (s *fakeServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
+	return nil, fmt.Errorf("AddMember not implemented in fakeServer")
+}
+func (s *fakeServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	return nil, fmt.Errorf("RemoveMember not implemented in fakeServer")
+}
+func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) {
+	return nil, fmt.Errorf("UpdateMember not implemented in fakeServer")
+}
+func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
+}
+func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
+func (s *fakeServer) Cluster() api.Cluster            { return s.cluster }
+func (s *fakeServer) Alarms() []*pb.AlarmMember       { return nil }
+
+var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+	w.Write([]byte("test data"))
+})
+
 // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
 // handles raft-prefix requests well.
 func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
-	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		w.Write([]byte("test data"))
-	})
-	ph := newPeerHandler(zap.NewExample(), &fakeCluster{}, h, nil)
+	ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil)
 	srv := httptest.NewServer(ph)
 	defer srv.Close()
 
@@ -80,6 +106,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
 	}
 }
 
+// TestServeMembersFails ensures peerMembersHandler only accepts GET request
 func TestServeMembersFails(t *testing.T) {
 	tests := []struct {
 		method string
@@ -89,6 +116,10 @@ func TestServeMembersFails(t *testing.T) {
 			"POST",
 			http.StatusMethodNotAllowed,
 		},
+		{
+			"PUT",
+			http.StatusMethodNotAllowed,
+		},
 		{
 			"DELETE",
 			http.StatusMethodNotAllowed,
@@ -100,8 +131,12 @@ func TestServeMembersFails(t *testing.T) {
 	}
 	for i, tt := range tests {
 		rw := httptest.NewRecorder()
-		h := &peerMembersHandler{cluster: nil}
-		h.ServeHTTP(rw, &http.Request{Method: tt.method})
+		h := newPeerMembersHandler(nil, &fakeCluster{})
+		req, err := http.NewRequest(tt.method, "", nil)
+		if err != nil {
+			t.Fatalf("#%d: failed to create http request: %v", i, err)
+		}
+		h.ServeHTTP(rw, req)
 		if rw.Code != tt.wcode {
 			t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
 		}
@@ -115,7 +150,7 @@ func TestServeMembersGet(t *testing.T) {
 		id:      1,
 		members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
 	}
-	h := &peerMembersHandler{cluster: cluster}
+	h := newPeerMembersHandler(nil, cluster)
 	msb, err := json.Marshal([]membership.Member{memb1, memb2})
 	if err != nil {
 		t.Fatal(err)
@@ -128,8 +163,8 @@ func TestServeMembersGet(t *testing.T) {
 		wct   string
 		wbody string
 	}{
-		{peerMembersPrefix, http.StatusOK, "application/json", wms},
-		{path.Join(peerMembersPrefix, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
+		{peerMembersPath, http.StatusOK, "application/json", wms},
+		{path.Join(peerMembersPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
 	}
 
 	for i, tt := range tests {
@@ -156,3 +191,90 @@ func TestServeMembersGet(t *testing.T) {
 		}
 	}
 }
+
+// TestServeMemberPromoteFails ensures peerMemberPromoteHandler only accepts POST request
+func TestServeMemberPromoteFails(t *testing.T) {
+	tests := []struct {
+		method string
+		wcode  int
+	}{
+		{
+			"GET",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			"PUT",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			"DELETE",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			"BAD",
+			http.StatusMethodNotAllowed,
+		},
+	}
+	for i, tt := range tests {
+		rw := httptest.NewRecorder()
+		h := newPeerMemberPromoteHandler(nil, &fakeServer{cluster: &fakeCluster{}})
+		req, err := http.NewRequest(tt.method, "", nil)
+		if err != nil {
+			t.Fatalf("#%d: failed to create http request: %v", i, err)
+		}
+		h.ServeHTTP(rw, req)
+		if rw.Code != tt.wcode {
+			t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
+		}
+	}
+}
+
+// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
+func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
+	ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil)
+	srv := httptest.NewServer(ph)
+	defer srv.Close()
+
+	tests := []struct {
+		path      string
+		wcode     int
+		checkBody bool
+		wKeyWords string
+	}{
+		{
+			// does not contain member id in path
+			peerMemberPromotePrefix,
+			http.StatusNotFound,
+			false,
+			"",
+		},
+		{
+			// try to promote member id = 1
+			peerMemberPromotePrefix + "1",
+			http.StatusInternalServerError,
+			true,
+			"PromoteMember not implemented in fakeServer",
+		},
+	}
+	for i, tt := range tests {
+		req, err := http.NewRequest("POST", srv.URL+tt.path, nil)
+		if err != nil {
+			t.Fatalf("failed to create request: %v", err)
+		}
+		resp, err := http.DefaultClient.Do(req)
+		if err != nil {
+			t.Fatalf("failed to get http response: %v", err)
+		}
+		body, err := ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		if err != nil {
+			t.Fatalf("unexpected ioutil.ReadAll error: %v", err)
+		}
+		if resp.StatusCode != tt.wcode {
+			t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode)
+		}
+		if tt.checkBody && strings.Contains(string(body), tt.wKeyWords) {
+			t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords)
+		}
+	}
+}

+ 46 - 0
etcdserver/cluster_util.go

@@ -15,11 +15,13 @@
 package etcdserver
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"net/http"
 	"sort"
+	"strings"
 	"time"
 
 	"go.etcd.io/etcd/etcdserver/api/membership"
@@ -355,3 +357,47 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*ve
 	}
 	return nil, err
 }
+
+func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.RoundTripper) ([]*membership.Member, error) {
+	cc := &http.Client{Transport: peerRt}
+	// TODO: refactor member http handler code
+	// cannot import etcdhttp, so manually construct url
+	requestUrl := url + "/members/promote/" + fmt.Sprintf("%d", id)
+	req, err := http.NewRequest("POST", requestUrl, nil)
+	if err != nil {
+		return nil, err
+	}
+	req = req.WithContext(ctx)
+	resp, err := cc.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode == http.StatusRequestTimeout {
+		return nil, ErrTimeout
+	}
+	if resp.StatusCode == http.StatusPreconditionFailed {
+		// both ErrMemberNotLearner and ErrLearnerNotReady have same http status code
+		if strings.Contains(string(b), membership.ErrLearnerNotReady.Error()) {
+			return nil, membership.ErrLearnerNotReady
+		}
+		if strings.Contains(string(b), membership.ErrMemberNotLearner.Error()) {
+			return nil, membership.ErrMemberNotLearner
+		}
+		return nil, fmt.Errorf("member promote: unknown error(%s)", string(b))
+	}
+	if resp.StatusCode == http.StatusNotFound {
+		return nil, membership.ErrIDNotFound
+	}
+
+	var membs []*membership.Member
+	if err := json.Unmarshal(b, &membs); err != nil {
+		return nil, err
+	}
+	return membs, nil
+}

+ 32 - 0
etcdserver/server.go

@@ -1635,6 +1635,38 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
 
 // PromoteMember promotes a learner node to a voting node.
 func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	resp, err := s.promoteMember(ctx, id)
+	if err != ErrNotLeader {
+		return resp, err
+	}
+
+	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
+	defer cancel()
+	// forward to leader
+	for cctx.Err() == nil {
+		leader, err := s.waitLeader(cctx)
+		if err != nil {
+			return nil, err
+		}
+		for _, url := range leader.PeerURLs {
+			resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
+			if err == nil {
+				return resp, nil
+			}
+			// If member promotion failed, return early. Otherwise keep retry.
+			if err == membership.ErrIDNotFound || err == membership.ErrLearnerNotReady || err == membership.ErrMemberNotLearner {
+				return nil, err
+			}
+		}
+	}
+
+	if cctx.Err() == context.DeadlineExceeded {
+		return nil, ErrTimeout
+	}
+	return nil, ErrCanceled
+}
+
+func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
 	if err := s.checkMembershipOperationPermission(ctx); err != nil {
 		return nil, err
 	}

+ 10 - 2
etcdserver/v3_server.go

@@ -260,7 +260,11 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
 			}
 		}
 	}
-	return -1, ErrTimeout
+
+	if cctx.Err() == context.DeadlineExceeded {
+		return -1, ErrTimeout
+	}
+	return -1, ErrCanceled
 }
 
 func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
@@ -303,7 +307,11 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
 			}
 		}
 	}
-	return nil, ErrTimeout
+
+	if cctx.Err() == context.DeadlineExceeded {
+		return nil, ErrTimeout
+	}
+	return nil, ErrCanceled
 }
 
 func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {