Browse Source

*: support updating advertise-peer-url
Users might want to update the peerurl of the etcd member in several cases.
For example, if the IP address of the physical machine etcd running on is
changed, user need to update the adversite-pee-rurl accordingly.
This commit makes etcd support updating the advertise-peer-url of its members.

Xiang Li 11 years ago
parent
commit
5967794009

+ 41 - 4
etcdserver/cluster.go

@@ -263,12 +263,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st }
 // ensures that it is still valid.
 // ensures that it is still valid.
 func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 	members, removed := membersFromStore(c.store)
 	members, removed := membersFromStore(c.store)
-	if removed[types.ID(cc.NodeID)] {
+	id := types.ID(cc.NodeID)
+	if removed[id] {
 		return ErrIDRemoved
 		return ErrIDRemoved
 	}
 	}
 	switch cc.Type {
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode:
 	case raftpb.ConfChangeAddNode:
-		if members[types.ID(cc.NodeID)] != nil {
+		if members[id] != nil {
 			return ErrIDExists
 			return ErrIDExists
 		}
 		}
 		urls := make(map[string]bool)
 		urls := make(map[string]bool)
@@ -287,11 +288,33 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 			}
 			}
 		}
 		}
 	case raftpb.ConfChangeRemoveNode:
 	case raftpb.ConfChangeRemoveNode:
-		if members[types.ID(cc.NodeID)] == nil {
+		if members[id] == nil {
 			return ErrIDNotFound
 			return ErrIDNotFound
 		}
 		}
+	case raftpb.ConfChangeUpdateNode:
+		if members[id] == nil {
+			return ErrIDNotFound
+		}
+		urls := make(map[string]bool)
+		for _, m := range members {
+			if m.ID == id {
+				continue
+			}
+			for _, u := range m.PeerURLs {
+				urls[u] = true
+			}
+		}
+		m := new(Member)
+		if err := json.Unmarshal(cc.Context, m); err != nil {
+			log.Panicf("unmarshal member should never fail: %v", err)
+		}
+		for _, u := range m.PeerURLs {
+			if urls[u] {
+				return ErrPeerURLexists
+			}
+		}
 	default:
 	default:
-		log.Panicf("ConfChange type should be either AddNode or RemoveNode")
+		log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -341,6 +364,20 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) {
 	c.members[id].Attributes = attr
 	c.members[id].Attributes = attr
 }
 }
 
 
+func (c *Cluster) UpdateMember(nm *Member) {
+	c.Lock()
+	defer c.Unlock()
+	b, err := json.Marshal(nm.RaftAttributes)
+	if err != nil {
+		log.Panicf("marshal raftAttributes should never fail: %v", err)
+	}
+	p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix)
+	if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
+		log.Panicf("update raftAttributes should never fail: %v", err)
+	}
+	c.members[nm.ID].RaftAttributes = nm.RaftAttributes
+}
+
 // nodeToMember builds member through a store node.
 // nodeToMember builds member through a store node.
 // the child nodes of the given node should be sorted by key.
 // the child nodes of the given node should be sorted by key.
 func nodeToMember(n *store.NodeExtern) (*Member, error) {
 func nodeToMember(n *store.NodeExtern) (*Member, error) {

+ 53 - 2
etcdserver/cluster_test.go

@@ -362,7 +362,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 	cl.RemoveMember(4)
 	cl.RemoveMember(4)
 
 
 	attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
 	attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
-	cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
+	ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
+	ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}}
+	ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
+	ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr})
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -403,7 +421,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 			raftpb.ConfChange{
 			raftpb.ConfChange{
 				Type:    raftpb.ConfChangeAddNode,
 				Type:    raftpb.ConfChangeAddNode,
 				NodeID:  5,
 				NodeID:  5,
-				Context: cxt,
+				Context: ctx,
 			},
 			},
 			ErrPeerURLexists,
 			ErrPeerURLexists,
 		},
 		},
@@ -414,6 +432,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 			},
 			},
 			ErrIDNotFound,
 			ErrIDNotFound,
 		},
 		},
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  5,
+				Context: ctx5,
+			},
+			nil,
+		},
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeUpdateNode,
+				NodeID:  5,
+				Context: ctx,
+			},
+			ErrIDNotFound,
+		},
+		// try to change the peer url of 2 to the peer url of 3
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeUpdateNode,
+				NodeID:  2,
+				Context: ctx2to3,
+			},
+			ErrPeerURLexists,
+		},
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeUpdateNode,
+				NodeID:  2,
+				Context: ctx2to5,
+			},
+			nil,
+		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		err := cl.ValidateConfigurationChange(tt.cc)
 		err := cl.ValidateConfigurationChange(tt.cc)

+ 65 - 26
etcdserver/etcdhttp/client.go

@@ -148,7 +148,7 @@ type membersHandler struct {
 }
 }
 
 
 func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
+	if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
 		return
 		return
 	}
 	}
 	w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
 	w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
@@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			log.Printf("etcdhttp: %v", err)
 			log.Printf("etcdhttp: %v", err)
 		}
 		}
 	case "POST":
 	case "POST":
-		ctype := r.Header.Get("Content-Type")
-		if ctype != "application/json" {
-			writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
-			return
-		}
-		b, err := ioutil.ReadAll(r.Body)
-		if err != nil {
-			writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
-			return
-		}
 		req := httptypes.MemberCreateRequest{}
 		req := httptypes.MemberCreateRequest{}
-		if err := json.Unmarshal(b, &req); err != nil {
-			writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
+		if ok := unmarshalRequest(r, &req, w); !ok {
 			return
 			return
 		}
 		}
-
 		now := h.clock.Now()
 		now := h.clock.Now()
 		m := etcdserver.NewMember("", req.PeerURLs, "", &now)
 		m := etcdserver.NewMember("", req.PeerURLs, "", &now)
-		err = h.server.AddMember(ctx, *m)
+		err := h.server.AddMember(ctx, *m)
 		switch {
 		switch {
 		case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
 		case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
 			writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
 			writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
@@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			log.Printf("etcdhttp: %v", err)
 			log.Printf("etcdhttp: %v", err)
 		}
 		}
 	case "DELETE":
 	case "DELETE":
-		idStr := trimPrefix(r.URL.Path, membersPrefix)
-		if idStr == "" {
-			http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
-			return
-		}
-		id, err := types.IDFromString(idStr)
-		if err != nil {
-			writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
+		id, ok := getID(r.URL.Path, w)
+		if !ok {
 			return
 			return
 		}
 		}
-		err = h.server.RemoveMember(ctx, uint64(id))
+		err := h.server.RemoveMember(ctx, uint64(id))
 		switch {
 		switch {
 		case err == etcdserver.ErrIDRemoved:
 		case err == etcdserver.ErrIDRemoved:
-			writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr)))
+			writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
 		case err == etcdserver.ErrIDNotFound:
 		case err == etcdserver.ErrIDNotFound:
-			writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
+			writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
 		case err != nil:
 		case err != nil:
 			log.Printf("etcdhttp: error removing node %s: %v", id, err)
 			log.Printf("etcdhttp: error removing node %s: %v", id, err)
 			writeError(w, err)
 			writeError(w, err)
 		default:
 		default:
 			w.WriteHeader(http.StatusNoContent)
 			w.WriteHeader(http.StatusNoContent)
 		}
 		}
+	case "PUT":
+		id, ok := getID(r.URL.Path, w)
+		if !ok {
+			return
+		}
+		req := httptypes.MemberUpdateRequest{}
+		if ok := unmarshalRequest(r, &req, w); !ok {
+			return
+		}
+		m := etcdserver.Member{
+			ID:             id,
+			RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
+		}
+		err := h.server.UpdateMember(ctx, m)
+		switch {
+		case err == etcdserver.ErrPeerURLexists:
+			writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
+		case err == etcdserver.ErrIDNotFound:
+			writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
+		case err != nil:
+			log.Printf("etcdhttp: error updating node %s: %v", m.ID, err)
+			writeError(w, err)
+		default:
+			w.WriteHeader(http.StatusNoContent)
+		}
 	}
 	}
 }
 }
 
 
@@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error {
 	return err
 	return err
 }
 }
 
 
+func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool {
+	ctype := r.Header.Get("Content-Type")
+	if ctype != "application/json" {
+		writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
+		return false
+	}
+	b, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
+		return false
+	}
+	if err := req.UnmarshalJSON(b); err != nil {
+		writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
+		return false
+	}
+	return true
+}
+
+func getID(p string, w http.ResponseWriter) (types.ID, bool) {
+	idStr := trimPrefix(p, membersPrefix)
+	if idStr == "" {
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return 0, false
+	}
+	id, err := types.IDFromString(idStr)
+	if err != nil {
+		writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
+		return 0, false
+	}
+	return id, true
+}
+
 // getUint64 extracts a uint64 by the given key from a Form. If the key does
 // getUint64 extracts a uint64 by the given key from a Form. If the key does
 // not exist in the form, 0 is returned. If the key exists but the value is
 // not exist in the form, 0 is returned. If the key exists but the value is
 // badly formed, an error is returned. If multiple values are present only the
 // badly formed, an error is returned. If multiple values are present only the

+ 188 - 5
etcdserver/etcdhttp/client_test.go

@@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
 	return nil
 	return nil
 }
 }
 
 
+func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error {
+	s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}})
+	return nil
+}
+
 type action struct {
 type action struct {
 	name   string
 	name   string
 	params []interface{}
 	params []interface{}
@@ -136,11 +141,12 @@ type resServer struct {
 func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
 func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
 	return rs.res, nil
 	return rs.res, nil
 }
 }
-func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error      { return nil }
-func (rs *resServer) Start()                                                 {}
-func (rs *resServer) Stop()                                                  {}
-func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
-func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error         { return nil }
+func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error         { return nil }
+func (rs *resServer) Start()                                                    {}
+func (rs *resServer) Stop()                                                     {}
+func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error    { return nil }
+func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error            { return nil }
+func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil }
 
 
 func boolp(b bool) *bool { return &b }
 func boolp(b bool) *bool { return &b }
 
 
@@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestServeMembersUpdate(t *testing.T) {
+	u := mustNewURL(t, path.Join(membersPrefix, "1"))
+	b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`)
+	req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b))
+	if err != nil {
+		t.Fatal(err)
+	}
+	req.Header.Set("Content-Type", "application/json")
+	s := &serverRecorder{}
+	h := &membersHandler{
+		server:      s,
+		clock:       clockwork.NewFakeClock(),
+		clusterInfo: &fakeCluster{id: 1},
+	}
+	rw := httptest.NewRecorder()
+
+	h.ServeHTTP(rw, req)
+
+	wcode := http.StatusNoContent
+	if rw.Code != wcode {
+		t.Errorf("code=%d, want %d", rw.Code, wcode)
+	}
+
+	gcid := rw.Header().Get("X-Etcd-Cluster-ID")
+	wcid := h.clusterInfo.ID().String()
+	if gcid != wcid {
+		t.Errorf("cid = %s, want %s", gcid, wcid)
+	}
+
+	wm := etcdserver.Member{
+		ID: 1,
+		RaftAttributes: etcdserver.RaftAttributes{
+			PeerURLs: []string{"http://127.0.0.1:1"},
+		},
+	}
+
+	wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}}
+	if !reflect.DeepEqual(s.actions, wactions) {
+		t.Errorf("actions = %+v, want %+v", s.actions, wactions)
+	}
+}
+
 func TestServeMembersFail(t *testing.T) {
 func TestServeMembersFail(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		req    *http.Request
 		req    *http.Request
@@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) {
 			},
 			},
 			nil,
 			nil,
 
 
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// parse body error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader("bad json")),
+				Header: map[string][]string{"Content-Type": []string{"application/json"}},
+			},
+			&resServer{},
+
+			http.StatusBadRequest,
+		},
+		{
+			// bad content type
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
+				Header: map[string][]string{"Content-Type": []string{"application/bad"}},
+			},
+			&errServer{},
+
+			http.StatusUnsupportedMediaType,
+		},
+		{
+			// bad url
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)),
+				Header: map[string][]string{"Content-Type": []string{"application/json"}},
+			},
+			&errServer{},
+
+			http.StatusBadRequest,
+		},
+		{
+			// etcdserver.UpdateMember error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
+				Header: map[string][]string{"Content-Type": []string{"application/json"}},
+			},
+			&errServer{
+				errors.New("blah"),
+			},
+
+			http.StatusInternalServerError,
+		},
+		{
+			// etcdserver.UpdateMember error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
+				Header: map[string][]string{"Content-Type": []string{"application/json"}},
+			},
+			&errServer{
+				etcdserver.ErrPeerURLexists,
+			},
+
+			http.StatusConflict,
+		},
+		{
+			// etcdserver.UpdateMember error
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "0")),
+				Method: "PUT",
+				Body:   ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
+				Header: map[string][]string{"Content-Type": []string{"application/json"}},
+			},
+			&errServer{
+				etcdserver.ErrIDNotFound,
+			},
+
+			http.StatusNotFound,
+		},
+		{
+			// etcdserver.UpdateMember error with badly formed ID
+			&http.Request{
+				URL:    mustNewURL(t, path.Join(membersPrefix, "bad_id")),
+				Method: "PUT",
+			},
+			nil,
+
+			http.StatusNotFound,
+		},
+		{
+			// etcdserver.UpdateMember with no ID
+			&http.Request{
+				URL:    mustNewURL(t, membersPrefix),
+				Method: "PUT",
+			},
+			nil,
+
 			http.StatusMethodNotAllowed,
 			http.StatusMethodNotAllowed,
 		},
 		},
 	}
 	}
@@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestGetID(t *testing.T) {
+	tests := []struct {
+		path string
+
+		wok   bool
+		wid   types.ID
+		wcode int
+	}{
+		{
+			"123",
+			true, 0x123, http.StatusOK,
+		},
+		{
+			"bad_id",
+			false, 0, http.StatusNotFound,
+		},
+		{
+			"",
+			false, 0, http.StatusMethodNotAllowed,
+		},
+	}
+
+	for i, tt := range tests {
+		w := httptest.NewRecorder()
+		id, ok := getID(tt.path, w)
+		if id != tt.wid {
+			t.Errorf("#%d: id = %d, want %d", i, id, tt.wid)
+		}
+		if ok != tt.wok {
+			t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok)
+		}
+		if w.Code != tt.wcode {
+			t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode)
+		}
+	}
+}
+
 type dummyStats struct {
 type dummyStats struct {
 	data []byte
 	data []byte
 }
 }

+ 3 - 0
etcdserver/etcdhttp/http_test.go

@@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error {
 func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
 func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
 	return fs.err
 	return fs.err
 }
 }
+func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error {
+	return fs.err
+}
 
 
 func TestWriteError(t *testing.T) {
 func TestWriteError(t *testing.T) {
 	// nil error should not panic
 	// nil error should not panic

+ 4 - 0
etcdserver/etcdhttp/httptypes/member.go

@@ -33,6 +33,10 @@ type MemberCreateRequest struct {
 	PeerURLs types.URLs
 	PeerURLs types.URLs
 }
 }
 
 
+type MemberUpdateRequest struct {
+	MemberCreateRequest
+}
+
 func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) {
 func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) {
 	s := struct {
 	s := struct {
 		PeerURLs []string `json:"peerURLs"`
 		PeerURLs []string `json:"peerURLs"`

+ 23 - 0
etcdserver/sender.go

@@ -21,6 +21,9 @@ import (
 	"fmt"
 	"fmt"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
+	"net/url"
+	"path"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
@@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) {
 	delete(h.senders, id)
 	delete(h.senders, id)
 }
 }
 
 
+func (h *sendHub) Update(m *Member) {
+	// TODO: return error or just panic?
+	if _, ok := h.senders[m.ID]; !ok {
+		return
+	}
+	peerURL := m.PickPeerURL()
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
+	s := h.senders[m.ID]
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.u = u.String()
+}
+
 type sender struct {
 type sender struct {
 	u   string
 	u   string
 	cid types.ID
 	cid types.ID
 	c   *http.Client
 	c   *http.Client
 	fs  *stats.FollowerStats
 	fs  *stats.FollowerStats
 	q   chan []byte
 	q   chan []byte
+	mu  sync.RWMutex
 }
 }
 
 
 func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
 func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
@@ -159,7 +180,9 @@ func (s *sender) handle() {
 // post POSTs a data payload to a url. Returns nil if the POST succeeds,
 // post POSTs a data payload to a url. Returns nil if the POST succeeds,
 // error on any failure.
 // error on any failure.
 func (s *sender) post(data []byte) error {
 func (s *sender) post(data []byte) error {
+	s.mu.RLock()
 	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
 	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
+	s.mu.RUnlock()
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("new request to %s error: %v", s.u, err)
 		return fmt.Errorf("new request to %s error: %v", s.u, err)
 	}
 	}

+ 31 - 1
etcdserver/server.go

@@ -89,6 +89,7 @@ type Sender interface {
 	Send(m []raftpb.Message)
 	Send(m []raftpb.Message)
 	Add(m *Member)
 	Add(m *Member)
 	Remove(id types.ID)
 	Remove(id types.ID)
+	Update(m *Member)
 	Stop()
 	Stop()
 }
 }
 
 
@@ -114,7 +115,7 @@ type Server interface {
 	// Stop terminates the Server and performs any necessary finalization.
 	// Stop terminates the Server and performs any necessary finalization.
 	// Do and Process cannot be called after Stop has been invoked.
 	// Do and Process cannot be called after Stop has been invoked.
 	Stop()
 	Stop()
-	// Do takes a request and attempts to fulfil it, returning a Response.
+	// Do takes a request and attempts to fulfill it, returning a Response.
 	Do(ctx context.Context, r pb.Request) (Response, error)
 	Do(ctx context.Context, r pb.Request) (Response, error)
 	// Process takes a raft message and applies it to the server's raft state
 	// Process takes a raft message and applies it to the server's raft state
 	// machine, respecting any timeout of the given context.
 	// machine, respecting any timeout of the given context.
@@ -127,6 +128,10 @@ type Server interface {
 	// return ErrIDRemoved if member ID is removed from the cluster, or return
 	// return ErrIDRemoved if member ID is removed from the cluster, or return
 	// ErrIDNotFound if member ID is not in the cluster.
 	// ErrIDNotFound if member ID is not in the cluster.
 	RemoveMember(ctx context.Context, id uint64) error
 	RemoveMember(ctx context.Context, id uint64) error
+
+	// UpdateMember attempts to update a existing member in the cluster. It will
+	// return ErrIDNotFound if the member ID does not exist.
+	UpdateMember(ctx context.Context, updateMemb Member) error
 }
 }
 
 
 type Stats interface {
 type Stats interface {
@@ -475,6 +480,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
 	return s.configure(ctx, cc)
 	return s.configure(ctx, cc)
 }
 }
 
 
+func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
+	b, err := json.Marshal(memb)
+	if err != nil {
+		return err
+	}
+	cc := raftpb.ConfChange{
+		ID:      GenID(),
+		Type:    raftpb.ConfChangeUpdateNode,
+		NodeID:  uint64(memb.ID),
+		Context: b,
+	}
+	return s.configure(ctx, cc)
+}
+
 // Implement the RaftTimer interface
 // Implement the RaftTimer interface
 func (s *EtcdServer) Index() uint64 {
 func (s *EtcdServer) Index() uint64 {
 	return atomic.LoadUint64(&s.raftIndex)
 	return atomic.LoadUint64(&s.raftIndex)
@@ -672,6 +691,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
 		s.Cluster.RemoveMember(id)
 		s.Cluster.RemoveMember(id)
 		s.sender.Remove(id)
 		s.sender.Remove(id)
 		log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID())
 		log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID())
+	case raftpb.ConfChangeUpdateNode:
+		m := new(Member)
+		if err := json.Unmarshal(cc.Context, m); err != nil {
+			log.Panicf("unmarshal member should never fail: %v", err)
+		}
+		if cc.NodeID != uint64(m.ID) {
+			log.Panicf("nodeID should always be equal to member ID")
+		}
+		s.Cluster.UpdateMember(m)
+		s.sender.Update(m)
+		log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 38 - 1
etcdserver/server_test.go

@@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) {
 		},
 		},
 		{
 		{
 			raftpb.ConfChange{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeRemoveNode,
+				Type:   raftpb.ConfChangeUpdateNode,
 				NodeID: 4,
 				NodeID: 4,
 			},
 			},
 			ErrIDRemoved,
 			ErrIDRemoved,
@@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) {
 	}
 	}
 }
 }
 func (s *fakeSender) Add(m *Member)      {}
 func (s *fakeSender) Add(m *Member)      {}
+func (s *fakeSender) Update(m *Member)   {}
 func (s *fakeSender) Remove(id types.ID) {}
 func (s *fakeSender) Remove(id types.ID) {}
 func (s *fakeSender) Stop()              {}
 func (s *fakeSender) Stop()              {}
 
 
@@ -1017,6 +1018,41 @@ func TestRemoveMember(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestUpdateMember tests RemoveMember can propose and perform node update.
+func TestUpdateMember(t *testing.T) {
+	n := newNodeConfChangeCommitterRecorder()
+	n.readyc <- raft.Ready{
+		SoftState: &raft.SoftState{
+			RaftState: raft.StateLeader,
+			Nodes:     []uint64{1234, 2345, 3456},
+		},
+	}
+	cl := newTestCluster([]*Member{{ID: 1234}})
+	s := &EtcdServer{
+		node:    n,
+		store:   &storeRecorder{},
+		sender:  &nopSender{},
+		storage: &storageRecorder{},
+		Cluster: cl,
+	}
+	s.start()
+	wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
+	err := s.UpdateMember(context.TODO(), wm)
+	gaction := n.Action()
+	s.Stop()
+
+	if err != nil {
+		t.Fatalf("UpdateMember error: %v", err)
+	}
+	wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}}
+	if !reflect.DeepEqual(gaction, wactions) {
+		t.Errorf("action = %v, want %v", gaction, wactions)
+	}
+	if !reflect.DeepEqual(cl.Member(1234), &wm) {
+		t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
+	}
+}
+
 // TODO: test server could stop itself when being removed
 // TODO: test server could stop itself when being removed
 
 
 // TODO: test wait trigger correctness in multi-server case
 // TODO: test wait trigger correctness in multi-server case
@@ -1446,6 +1482,7 @@ type nopSender struct{}
 func (s *nopSender) Send(m []raftpb.Message) {}
 func (s *nopSender) Send(m []raftpb.Message) {}
 func (s *nopSender) Add(m *Member)           {}
 func (s *nopSender) Add(m *Member)           {}
 func (s *nopSender) Remove(id types.ID)      {}
 func (s *nopSender) Remove(id types.ID)      {}
+func (s *nopSender) Update(m *Member)        {}
 func (s *nopSender) Stop()                   {}
 func (s *nopSender) Stop()                   {}
 
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {

+ 2 - 0
raft/node.go

@@ -271,6 +271,8 @@ func (n *node) run(r *raft) {
 				r.addNode(cc.NodeID)
 				r.addNode(cc.NodeID)
 			case pb.ConfChangeRemoveNode:
 			case pb.ConfChangeRemoveNode:
 				r.removeNode(cc.NodeID)
 				r.removeNode(cc.NodeID)
+			case pb.ConfChangeUpdateNode:
+				r.resetPendingConf()
 			default:
 			default:
 				panic("unexpected conf type")
 				panic("unexpected conf type")
 			}
 			}

+ 3 - 0
raft/raftpb/raft.pb.go

@@ -120,15 +120,18 @@ type ConfChangeType int32
 const (
 const (
 	ConfChangeAddNode    ConfChangeType = 0
 	ConfChangeAddNode    ConfChangeType = 0
 	ConfChangeRemoveNode ConfChangeType = 1
 	ConfChangeRemoveNode ConfChangeType = 1
+	ConfChangeUpdateNode ConfChangeType = 2
 )
 )
 
 
 var ConfChangeType_name = map[int32]string{
 var ConfChangeType_name = map[int32]string{
 	0: "ConfChangeAddNode",
 	0: "ConfChangeAddNode",
 	1: "ConfChangeRemoveNode",
 	1: "ConfChangeRemoveNode",
+	2: "ConfChangeUpdateNode",
 }
 }
 var ConfChangeType_value = map[string]int32{
 var ConfChangeType_value = map[string]int32{
 	"ConfChangeAddNode":    0,
 	"ConfChangeAddNode":    0,
 	"ConfChangeRemoveNode": 1,
 	"ConfChangeRemoveNode": 1,
+	"ConfChangeUpdateNode": 2,
 }
 }
 
 
 func (x ConfChangeType) Enum() *ConfChangeType {
 func (x ConfChangeType) Enum() *ConfChangeType {

+ 1 - 0
raft/raftpb/raft.proto

@@ -60,6 +60,7 @@ message HardState {
 enum ConfChangeType {
 enum ConfChangeType {
 	ConfChangeAddNode    = 0;
 	ConfChangeAddNode    = 0;
 	ConfChangeRemoveNode = 1;
 	ConfChangeRemoveNode = 1;
+	ConfChangeUpdateNode = 2;
 }
 }
 
 
 message ConfChange {
 message ConfChange {