Browse Source

etcdserver: add AddNode, RemoveNode

AddNode and RemoveNode is used to propose config change to the cluster.
If succeeds, it will add/remove node from the cluster.
Yicheng Qin 11 years ago
parent
commit
aaffb9eb78

+ 168 - 0
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -10,6 +10,7 @@
 
 
 	It has these top-level messages:
 	It has these top-level messages:
 		Request
 		Request
+		Config
 */
 */
 package etcdserverpb
 package etcdserverpb
 
 
@@ -50,6 +51,18 @@ func (m *Request) Reset()         { *m = Request{} }
 func (m *Request) String() string { return proto.CompactTextString(m) }
 func (m *Request) String() string { return proto.CompactTextString(m) }
 func (*Request) ProtoMessage()    {}
 func (*Request) ProtoMessage()    {}
 
 
+type Config struct {
+	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
+	Type             int64  `protobuf:"varint,2,req,name=type" json:"type"`
+	NodeID           int64  `protobuf:"varint,3,req,name=nodeID" json:"nodeID"`
+	Context          []byte `protobuf:"bytes,4,opt,name=context" json:"context"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Config) Reset()         { *m = Config{} }
+func (m *Config) String() string { return proto.CompactTextString(m) }
+func (*Config) ProtoMessage()    {}
+
 func init() {
 func init() {
 }
 }
 func (m *Request) Unmarshal(data []byte) error {
 func (m *Request) Unmarshal(data []byte) error {
@@ -360,6 +373,115 @@ func (m *Request) Unmarshal(data []byte) error {
 	}
 	}
 	return nil
 	return nil
 }
 }
+func (m *Config) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Id |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Type |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.NodeID |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Context = append(m.Context, data[index:postIndex]...)
+			index = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
 func (m *Request) Size() (n int) {
 func (m *Request) Size() (n int) {
 	var l int
 	var l int
 	_ = l
 	_ = l
@@ -389,6 +511,19 @@ func (m *Request) Size() (n int) {
 	}
 	}
 	return n
 	return n
 }
 }
+func (m *Config) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovEtcdserver(uint64(m.Id))
+	n += 1 + sovEtcdserver(uint64(m.Type))
+	n += 1 + sovEtcdserver(uint64(m.NodeID))
+	l = len(m.Context)
+	n += 1 + l + sovEtcdserver(uint64(l))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
 
 
 func sovEtcdserver(x uint64) (n int) {
 func sovEtcdserver(x uint64) (n int) {
 	for {
 	for {
@@ -504,6 +639,39 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
 	}
 	}
 	return i, nil
 	return i, nil
 }
 }
+func (m *Config) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Config) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintEtcdserver(data, i, uint64(m.Id))
+	data[i] = 0x10
+	i++
+	i = encodeVarintEtcdserver(data, i, uint64(m.Type))
+	data[i] = 0x18
+	i++
+	i = encodeVarintEtcdserver(data, i, uint64(m.NodeID))
+	data[i] = 0x22
+	i++
+	i = encodeVarintEtcdserver(data, i, uint64(len(m.Context)))
+	i += copy(data[i:], m.Context)
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 	data[offset] = uint8(v)
 	data[offset] = uint8(v)
 	data[offset+1] = uint8(v >> 8)
 	data[offset+1] = uint8(v >> 8)

+ 7 - 0
etcdserver/etcdserverpb/etcdserver.proto

@@ -24,3 +24,10 @@ message Request {
 	required bool   quorum     = 14 [(gogoproto.nullable) = false];
 	required bool   quorum     = 14 [(gogoproto.nullable) = false];
 	required int64  time       = 15 [(gogoproto.nullable) = false];
 	required int64  time       = 15 [(gogoproto.nullable) = false];
 }
 }
+
+message Config {
+	required int64 id      = 1 [(gogoproto.nullable) = false];
+	required int64 type    = 2 [(gogoproto.nullable) = false];
+	required int64 nodeID  = 3 [(gogoproto.nullable) = false];
+	optional bytes context = 4 [(gogoproto.nullable) = false];
+}

+ 75 - 6
etcdserver/server.go

@@ -19,6 +19,11 @@ const (
 	DefaultSnapCount   = 10000
 	DefaultSnapCount   = 10000
 )
 )
 
 
+const (
+	configAddNode int64 = iota
+	configRemoveNode
+)
+
 var (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrStopped       = errors.New("etcdserver: server stopped")
 	ErrStopped       = errors.New("etcdserver: server stopped")
@@ -121,11 +126,23 @@ func (s *EtcdServer) run() {
 			// care to apply entries in a single goroutine, and not
 			// care to apply entries in a single goroutine, and not
 			// race them.
 			// race them.
 			for _, e := range rd.CommittedEntries {
 			for _, e := range rd.CommittedEntries {
-				var r pb.Request
-				if err := r.Unmarshal(e.Data); err != nil {
-					panic("TODO: this is bad, what do we do about it?")
+				switch e.Type {
+				case raft.EntryNormal:
+					var r pb.Request
+					if err := r.Unmarshal(e.Data); err != nil {
+						panic("TODO: this is bad, what do we do about it?")
+					}
+					s.w.Trigger(r.Id, s.applyRequest(r))
+				case raft.EntryConfig:
+					var c pb.Config
+					if err := c.Unmarshal(e.Data); err != nil {
+						panic("TODO: this is bad, what do we do about it?")
+					}
+					s.applyConfig(c)
+					s.w.Trigger(c.Id, nil)
+				default:
+					panic("unsupported entry type")
 				}
 				}
-				s.w.Trigger(r.Id, s.apply(r))
 				appliedi = e.Index
 				appliedi = e.Index
 			}
 			}
 
 
@@ -218,6 +235,46 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 	}
 }
 }
 
 
+func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
+	req := pb.Config{
+		Id:      GenID(),
+		Type:    configAddNode,
+		NodeID:  id,
+		Context: context,
+	}
+	return s.configure(ctx, req)
+}
+
+func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
+	req := pb.Config{
+		Id:     GenID(),
+		Type:   configRemoveNode,
+		NodeID: id,
+	}
+	return s.configure(ctx, req)
+}
+
+// configure sends configuration change through consensus then performs it.
+// It will block until the change is performed or there is an error.
+func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error {
+	data, err := r.Marshal()
+	if err != nil {
+		log.Printf("marshal request %#v error: %v", r, err)
+		return err
+	}
+	ch := s.w.Register(r.Id)
+	s.Node.Configure(ctx, data)
+	select {
+	case <-ch:
+		return nil
+	case <-ctx.Done():
+		s.w.Trigger(r.Id, nil) // GC wait
+		return ctx.Err()
+	case <-s.done:
+		return ErrStopped
+	}
+}
+
 // sync proposes a SYNC request and is non-blocking.
 // sync proposes a SYNC request and is non-blocking.
 // This makes no guarantee that the request will be proposed or performed.
 // This makes no guarantee that the request will be proposed or performed.
 // The request will be cancelled after the given timeout.
 // The request will be cancelled after the given timeout.
@@ -249,8 +306,8 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 	return t
 }
 }
 
 
-// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
-func (s *EtcdServer) apply(r pb.Request) Response {
+// applyRequest interprets r as a call to store.X and returns an Response interpreted from store.Event
+func (s *EtcdServer) applyRequest(r pb.Request) Response {
 	f := func(ev *store.Event, err error) Response {
 	f := func(ev *store.Event, err error) Response {
 		return Response{Event: ev, err: err}
 		return Response{Event: ev, err: err}
 	}
 	}
@@ -290,6 +347,18 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 	}
 	}
 }
 }
 
 
+func (s *EtcdServer) applyConfig(r pb.Config) {
+	switch r.Type {
+	case configAddNode:
+		s.Node.AddNode(r.NodeID)
+	case configRemoveNode:
+		s.Node.RemoveNode(r.NodeID)
+	default:
+		// This should never be reached
+		panic("unsupported config type")
+	}
+}
+
 // TODO: non-blocking snapshot
 // TODO: non-blocking snapshot
 func (s *EtcdServer) snapshot() {
 func (s *EtcdServer) snapshot() {
 	d, err := s.Store.Save()
 	d, err := s.Store.Save()

+ 75 - 6
etcdserver/server_test.go

@@ -120,7 +120,7 @@ func TestDoBadLocalAction(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestApply(t *testing.T) {
+func TestApplyRequest(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		req pb.Request
 		req pb.Request
 
 
@@ -188,7 +188,7 @@ func TestApply(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 		st := &storeRecorder{}
 		st := &storeRecorder{}
 		srv := &EtcdServer{Store: st}
 		srv := &EtcdServer{Store: st}
-		resp := srv.apply(tt.req)
+		resp := srv.applyRequest(tt.req)
 
 
 		if !reflect.DeepEqual(resp, tt.wresp) {
 		if !reflect.DeepEqual(resp, tt.wresp) {
 			t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
 			t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
@@ -594,6 +594,46 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestAddNode tests AddNode could propose configuration and add node to raft.
+func TestAddNode(t *testing.T) {
+	n := newNodeCommitterRecorder()
+	s := &EtcdServer{
+		Node:    n,
+		Store:   &storeRecorder{},
+		Send:    func(_ []raftpb.Message) {},
+		Storage: &storageRecorder{},
+	}
+	s.Start()
+	s.AddNode(context.TODO(), 1, []byte("foo"))
+	action := n.Action()
+	s.Stop()
+
+	waction := []string{"Configure", "AddNode"}
+	if !reflect.DeepEqual(action, waction) {
+		t.Errorf("action = %v, want %v", action, waction)
+	}
+}
+
+// TestRemoveNode tests RemoveNode could propose configuration and remove node from raft.
+func TestRemoveNode(t *testing.T) {
+	n := newNodeCommitterRecorder()
+	s := &EtcdServer{
+		Node:    n,
+		Store:   &storeRecorder{},
+		Send:    func(_ []raftpb.Message) {},
+		Storage: &storageRecorder{},
+	}
+	s.Start()
+	s.RemoveNode(context.TODO(), 1)
+	action := n.Action()
+	s.Stop()
+
+	waction := []string{"Configure", "RemoveNode"}
+	if !reflect.DeepEqual(action, waction) {
+		t.Errorf("action = %v, want %v", action, waction)
+	}
+}
+
 // TODO: test wait trigger correctness in multi-server case
 // TODO: test wait trigger correctness in multi-server case
 
 
 func TestGetBool(t *testing.T) {
 func TestGetBool(t *testing.T) {
@@ -788,20 +828,27 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
 	n.record("Propose")
 	n.record("Propose")
 	return nil
 	return nil
 }
 }
-func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
-	n.record("Step")
+func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error {
+	n.record("Configure")
 	return nil
 	return nil
 }
 }
-func (n *nodeRecorder) Ready() <-chan raft.Ready {
-	n.record("Ready")
+func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
+	n.record("Step")
 	return nil
 	return nil
 }
 }
+func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
 func (n *nodeRecorder) Stop() {
 func (n *nodeRecorder) Stop() {
 	n.record("Stop")
 	n.record("Stop")
 }
 }
 func (n *nodeRecorder) Compact(d []byte) {
 func (n *nodeRecorder) Compact(d []byte) {
 	n.record("Compact")
 	n.record("Compact")
 }
 }
+func (n *nodeRecorder) AddNode(id int64) {
+	n.record("AddNode")
+}
+func (n *nodeRecorder) RemoveNode(id int64) {
+	n.record("RemoveNode")
+}
 
 
 type nodeProposeDataRecorder struct {
 type nodeProposeDataRecorder struct {
 	nodeRecorder
 	nodeRecorder
@@ -832,3 +879,25 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
 	n.record("Propose blocked")
 	n.record("Propose blocked")
 	return nil
 	return nil
 }
 }
+
+type nodeCommitterRecorder struct {
+	nodeRecorder
+	readyc chan raft.Ready
+}
+
+func newNodeCommitterRecorder() *nodeCommitterRecorder {
+	readyc := make(chan raft.Ready, 1)
+	readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
+	return &nodeCommitterRecorder{readyc: readyc}
+}
+func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error {
+	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}}
+	return n.nodeRecorder.Propose(ctx, data)
+}
+func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error {
+	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raft.EntryConfig, Data: data}}}
+	return n.nodeRecorder.Configure(ctx, data)
+}
+func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready {
+	return n.readyc
+}