Browse Source

Merge pull request #1299 from unihorn/162

etcdhttp: add PUT and DELETE on /v2/admin/members/
Yicheng Qin 11 years ago
parent
commit
a8a1d4fd93
4 changed files with 305 additions and 4 deletions
  1. 67 0
      etcdserver/etcdhttp/doc.go
  2. 46 0
      etcdserver/etcdhttp/http.go
  3. 188 4
      etcdserver/etcdhttp/http_test.go
  4. 4 0
      etcdserver/server.go

+ 67 - 0
etcdserver/etcdhttp/doc.go

@@ -0,0 +1,67 @@
+// Copyright 2014 CoreOS Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+
+## Add Member
+
+### Request
+
+`curl http://${remote_client_url}/v2/admin/members/{$id} -XPUT -d 'PeerURLs=${peer_url_1}&PeerURLs=${peer_url_2}'`
+
+Parameter `remote_client_url` is serving client url of the cluster.
+Parameter `id` is the identification of new member in hexadecimal.
+Parameter `peer_url_` is peer urls of the new member.
+
+### Response
+
+Categorized by HTTP status code.
+
+#### HTTP 201
+
+The member is created successfully.
+
+#### HTTP 400
+
+etcd cannot parse out the request.
+
+#### HTTP 500
+
+etcd fails to create the new member.
+
+## Remove Member
+
+### Request
+
+`curl http://${remote_client_url}/v2/admin/members/{$id} -XDELETE`
+
+Parameter `remote_client_url` is serving client url of the cluster.
+Parameter `id` is the identification of member to be removed in hexadecimal.
+
+### Response
+
+#### HTTP 204
+
+The member is removed successfully.
+
+#### HTTP 400
+
+etcd cannot parse out the request.
+
+#### HTTP 500
+
+etcd fails to remove the member.
+
+*/
+package etcdhttp

+ 46 - 0
etcdserver/etcdhttp/http.go

@@ -23,6 +23,7 @@ import (
 const (
 	keysPrefix               = "/v2/keys"
 	deprecatedMachinesPrefix = "/v2/machines"
+	adminMembersPrefix       = "/v2/admin/members/"
 	raftPrefix               = "/raft"
 
 	// time to wait for response from EtcdServer requests
@@ -49,6 +50,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 	// TODO: dynamic configuration may introduce race also.
 	// TODO: add serveMembers
 	mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
+	mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
 	mux.HandleFunc("/", http.NotFound)
 	return mux
 }
@@ -116,6 +118,50 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
 	w.Write([]byte(strings.Join(endpoints, ", ")))
 }
 
+func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
+	if !allowMethod(w, r.Method, "PUT", "DELETE") {
+		return
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
+	defer cancel()
+	idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
+	id, err := strconv.ParseUint(idStr, 16, 64)
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+		return
+	}
+
+	switch r.Method {
+	case "PUT":
+		if err := r.ParseForm(); err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+		peerURLs := r.PostForm["PeerURLs"]
+		log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
+		m := etcdserver.Member{
+			ID: id,
+			RaftAttributes: etcdserver.RaftAttributes{
+				PeerURLs: peerURLs,
+			},
+		}
+		if err := h.server.AddMember(ctx, m); err != nil {
+			log.Printf("etcdhttp: error adding node %x: %v", id, err)
+			writeError(w, err)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+	case "DELETE":
+		log.Printf("etcdhttp: remove node %x", id)
+		if err := h.server.RemoveMember(ctx, id); err != nil {
+			log.Printf("etcdhttp: error removing node %x: %v", id, err)
+			writeError(w, err)
+			return
+		}
+		w.WriteHeader(http.StatusNoContent)
+	}
+}
+
 func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r.Method, "POST") {
 		return

+ 188 - 4
etcdserver/etcdhttp/http_test.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"io"
+	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
 	"net/url"
@@ -703,7 +704,7 @@ func TestAllowMethod(t *testing.T) {
 }
 
 // errServer implements the etcd.Server interface for testing.
-// It returns the given error from any Do/Process calls.
+// It returns the given error from any Do/Process/AddMember/RemoveMember calls.
 type errServer struct {
 	err error
 }
@@ -716,6 +717,12 @@ func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
 }
 func (fs *errServer) Start() {}
 func (fs *errServer) Stop()  {}
+func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error {
+	return fs.err
+}
+func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
+	return fs.err
+}
 
 // errReader implements io.Reader to facilitate a broken request.
 type errReader struct{}
@@ -839,9 +846,11 @@ type resServer struct {
 func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
 	return rs.res, nil
 }
-func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
-func (rs *resServer) Start()                                            {}
-func (rs *resServer) Stop()                                             {}
+func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error      { return nil }
+func (rs *resServer) Start()                                                 {}
+func (rs *resServer) Stop()                                                  {}
+func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
+func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error         { return nil }
 
 func mustMarshalEvent(t *testing.T, ev *store.Event) string {
 	b := new(bytes.Buffer)
@@ -1234,6 +1243,181 @@ func TestHandleWatchStreaming(t *testing.T) {
 	}
 }
 
+func TestServeAdminMembersFail(t *testing.T) {
+	tests := []struct {
+		req    *http.Request
+		server etcdserver.Server
+
+		wcode int
+	}{
+		{
+			// bad method
+			&http.Request{
+				Method: "CONNECT",
+			},
+			&resServer{},
+
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad method
+			&http.Request{
+				Method: "TRACE",
+			},
+			&resServer{},
+
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// parse id error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(adminMembersPrefix, "badID")),
+				Method: "PUT",
+			},
+			&resServer{},
+
+			http.StatusBadRequest,
+		},
+		{
+			// parse body error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(adminMembersPrefix, "1")),
+				Method: "PUT",
+			},
+			&resServer{},
+
+			http.StatusBadRequest,
+		},
+		{
+			// etcdserver.AddMember error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(adminMembersPrefix, "1")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader("")),
+			},
+			&errServer{
+				errors.New("blah"),
+			},
+
+			http.StatusInternalServerError,
+		},
+		{
+			// etcdserver.RemoveMember error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(adminMembersPrefix, "1")),
+				Method: "DELETE",
+			},
+			&errServer{
+				errors.New("blah"),
+			},
+
+			http.StatusInternalServerError,
+		},
+	}
+	for i, tt := range tests {
+		h := &serverHandler{
+			server: tt.server,
+		}
+		rw := httptest.NewRecorder()
+		h.serveAdminMembers(rw, tt.req)
+		if rw.Code != tt.wcode {
+			t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
+		}
+	}
+}
+
+type action struct {
+	name   string
+	params []interface{}
+}
+
+type serverRecorder struct {
+	actions []action
+}
+
+func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
+	s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
+	return etcdserver.Response{}, nil
+}
+func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
+	s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
+	return nil
+}
+func (s *serverRecorder) Start() {}
+func (s *serverRecorder) Stop()  {}
+func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error {
+	s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
+	return nil
+}
+func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
+	s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}})
+	return nil
+}
+
+func TestServeAdminMembersPut(t *testing.T) {
+	u := mustNewURL(t, path.Join(adminMembersPrefix, "BEEF"))
+	form := url.Values{"PeerURLs": []string{"http://a", "http://b"}}
+	body := strings.NewReader(form.Encode())
+	req, err := http.NewRequest("PUT", u.String(), body)
+	if err != nil {
+		t.Fatal(err)
+	}
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+	s := &serverRecorder{}
+	h := &serverHandler{
+		server: s,
+	}
+	rw := httptest.NewRecorder()
+
+	h.serveAdminMembers(rw, req)
+
+	wcode := http.StatusCreated
+	if rw.Code != wcode {
+		t.Errorf("code=%d, want %d", rw.Code, wcode)
+	}
+	g := rw.Body.String()
+	if g != "" {
+		t.Errorf("got body=%q, want %q", g, "")
+	}
+	wm := etcdserver.Member{
+		ID: 0xBEEF,
+		RaftAttributes: etcdserver.RaftAttributes{
+			PeerURLs: []string{"http://a", "http://b"},
+		},
+	}
+	wactions := []action{{name: "AddMember", params: []interface{}{wm}}}
+	if !reflect.DeepEqual(s.actions, wactions) {
+		t.Errorf("actions = %+v, want %+v", s.actions, wactions)
+	}
+}
+
+func TestServeAdminMembersDelete(t *testing.T) {
+	req := &http.Request{
+		Method: "DELETE",
+		URL:    mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")),
+	}
+	s := &serverRecorder{}
+	h := &serverHandler{
+		server: s,
+	}
+	rw := httptest.NewRecorder()
+
+	h.serveAdminMembers(rw, req)
+
+	wcode := http.StatusNoContent
+	if rw.Code != wcode {
+		t.Errorf("code=%d, want %d", rw.Code, wcode)
+	}
+	g := rw.Body.String()
+	if g != "" {
+		t.Errorf("got body=%q, want %q", g, "")
+	}
+	wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}}
+	if !reflect.DeepEqual(s.actions, wactions) {
+		t.Errorf("actions = %+v, want %+v", s.actions, wactions)
+	}
+}
+
 type fakeCluster struct {
 	members []etcdserver.Member
 }

+ 4 - 0
etcdserver/server.go

@@ -75,6 +75,10 @@ type Server interface {
 	// Process takes a raft message and applies it to the server's raft state
 	// machine, respecting any timeout of the given context.
 	Process(ctx context.Context, m raftpb.Message) error
+	// AddMember attempts to add a member into the cluster.
+	AddMember(ctx context.Context, memb Member) error
+	// RemoveMember attempts to remove a member from the cluster.
+	RemoveMember(ctx context.Context, id uint64) error
 }
 
 type RaftTimer interface {