Browse Source

raft: add Configure, AddNode, RemoveNode

Configure is used to propose config change. AddNode and RemoveNode is
used to apply cluster change to raft state machine. They are the
basics for dynamic configuration.
Yicheng Qin 11 years ago
parent
commit
ff6705b94b
7 changed files with 225 additions and 1 deletions
  1. 11 0
      raft/doc.go
  2. 5 0
      raft/log.go
  3. 53 1
      raft/node.go
  4. 30 0
      raft/raft.go
  5. 105 0
      raft/raft_test.go
  6. 20 0
      raft/raftpb/raft.pb.go
  7. 1 0
      raft/raftpb/raft.proto

+ 11 - 0
raft/doc.go

@@ -61,5 +61,16 @@ data, serialize it into a byte slice and call:
 
 	n.Propose(ctx, data)
 
+To add or remove node in a cluster, serialize the data for configuration change
+into a byte slice and call:
+
+	n.Configure(ctx, data)
+
+For the safety consideration, one configuration should include at most one node
+change, which is applied through:
+
+	n.AddNode(id)
+	n.RemoveNode(id)
+
 */
 package raft

+ 5 - 0
raft/log.go

@@ -10,6 +10,11 @@ const (
 	defaultCompactThreshold = 10000
 )
 
+const (
+	EntryNormal int64 = iota
+	EntryConfig
+)
+
 type raftLog struct {
 	ents      []pb.Entry
 	unstable  int64

+ 53 - 1
raft/node.go

@@ -76,10 +76,12 @@ type Node interface {
 	// Tick increments the internal logical clock for the Node by a single tick. Election
 	// timeouts and heartbeat timeouts are in units of ticks.
 	Tick()
-	// Campaign causes the Node to transition to candidate state and start campaigning to become leader
+	// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
 	Campaign(ctx context.Context) error
 	// Propose proposes that data be appended to the log.
 	Propose(ctx context.Context, data []byte) error
+	// Configure proposes config change. Only one config can be in the process of going through consensus at a time.
+	Configure(ctx context.Context, data []byte) error
 	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
 	Step(ctx context.Context, msg pb.Message) error
 	// Ready returns a channel that returns the current point-in-time state
@@ -88,6 +90,12 @@ type Node interface {
 	Stop()
 	// Compact
 	Compact(d []byte)
+	// AddNode adds a node with given id into peer list.
+	// TODO: reject existed node
+	AddNode(id int64)
+	// RemoveNode removes a node with give id from peer list.
+	// TODO: reject unexisted node
+	RemoveNode(id int64)
 }
 
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
@@ -114,11 +122,22 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.
 	return &n
 }
 
+const (
+	confAdd = iota
+	confRemove
+)
+
+type conf struct {
+	typ int
+	id  int64
+}
+
 // node is the canonical implementation of the Node interface
 type node struct {
 	propc    chan pb.Message
 	recvc    chan pb.Message
 	compactc chan []byte
+	confc    chan conf
 	readyc   chan Ready
 	tickc    chan struct{}
 	done     chan struct{}
@@ -129,6 +148,7 @@ func newNode() node {
 		propc:    make(chan pb.Message),
 		recvc:    make(chan pb.Message),
 		compactc: make(chan []byte),
+		confc:    make(chan conf),
 		readyc:   make(chan Ready),
 		tickc:    make(chan struct{}),
 		done:     make(chan struct{}),
@@ -167,6 +187,7 @@ func (n *node) run(r *raft) {
 		}
 
 		select {
+		// TODO: buffer the config propose if there exists one
 		case m := <-propc:
 			m.From = r.id
 			r.Step(m)
@@ -174,6 +195,15 @@ func (n *node) run(r *raft) {
 			r.Step(m) // raft never returns an error
 		case d := <-n.compactc:
 			r.compact(d)
+		case c := <-n.confc:
+			switch c.typ {
+			case confAdd:
+				r.addNode(c.id)
+			case confRemove:
+				r.removeNode(c.id)
+			default:
+				panic("unexpected conf type")
+			}
 		case <-n.tickc:
 			r.tick()
 		case readyc <- rd:
@@ -186,6 +216,10 @@ func (n *node) run(r *raft) {
 			if !IsEmptySnap(rd.Snapshot) {
 				prevSnapi = rd.Snapshot.Index
 			}
+			// TODO(yichengq): we assume that all committed config
+			// entries will be applied to make things easy for now.
+			// TODO(yichengq): it may have race because applied is set
+			// before entries are applied.
 			r.raftLog.resetNextEnts()
 			r.raftLog.resetUnstable()
 			r.msgs = nil
@@ -212,6 +246,10 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
 	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
 }
 
+func (n *node) Configure(ctx context.Context, data []byte) error {
+	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig, Data: data}}})
+}
+
 // Step advances the state machine using msgs. The ctx.Err() will be returned,
 // if any.
 func (n *node) Step(ctx context.Context, m pb.Message) error {
@@ -241,6 +279,20 @@ func (n *node) Compact(d []byte) {
 	}
 }
 
+func (n *node) AddNode(id int64) {
+	select {
+	case n.confc <- conf{typ: confAdd, id: id}:
+	case <-n.done:
+	}
+}
+
+func (n *node) RemoveNode(id int64) {
+	select {
+	case n.confc <- conf{typ: confRemove, id: id}:
+	case <-n.done:
+	}
+}
+
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
 	rd := Ready{
 		Entries:          r.raftLog.unstableEnts(),

+ 30 - 0
raft/raft.go

@@ -105,6 +105,10 @@ type raft struct {
 	// the leader id
 	lead int64
 
+	// pending configuration
+	// New configuration is ignored if there exists configuration unapplied.
+	pendingConf bool
+
 	elapsed          int // number of ticks since the last msg
 	heartbeatTimeout int
 	electionTimeout  int
@@ -245,6 +249,7 @@ func (r *raft) reset(term int64) {
 			r.prs[i].match = r.raftLog.lastIndex()
 		}
 	}
+	r.pendingConf = false
 }
 
 func (r *raft) q() int {
@@ -308,6 +313,15 @@ func (r *raft) becomeLeader() {
 	r.tick = r.tickHeartbeat
 	r.lead = r.id
 	r.state = StateLeader
+	for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
+		if e.Type != EntryConfig {
+			continue
+		}
+		if r.pendingConf {
+			panic("unexpected double uncommitted config entry")
+		}
+		r.pendingConf = true
+	}
 	r.appendEntry(pb.Entry{Data: nil})
 }
 
@@ -373,6 +387,16 @@ func (r *raft) handleSnapshot(m pb.Message) {
 	}
 }
 
+func (r *raft) addNode(id int64) {
+	r.setProgress(id, 0, r.raftLog.lastIndex()+1)
+	r.pendingConf = false
+}
+
+func (r *raft) removeNode(id int64) {
+	r.delProgress(id)
+	r.pendingConf = false
+}
+
 type stepFunc func(r *raft, m pb.Message)
 
 func stepLeader(r *raft, m pb.Message) {
@@ -384,6 +408,12 @@ func stepLeader(r *raft, m pb.Message) {
 			panic("unexpected length(entries) of a msgProp")
 		}
 		e := m.Entries[0]
+		if e.Type == EntryConfig {
+			if r.pendingConf {
+				return
+			}
+			r.pendingConf = true
+		}
 		r.appendEntry(e)
 		r.bcastAppend()
 	case msgAppResp:

+ 105 - 0
raft/raft_test.go

@@ -948,6 +948,111 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 }
 
+// TestStepConfig tests that when raft step msgProp in ConfigEntry type,
+// it appends the entry to log and sets pendingConf to be true.
+func TestStepConfig(t *testing.T) {
+	// a raft that cannot make progress
+	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	index := r.raftLog.lastIndex()
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	if g := r.raftLog.lastIndex(); g != index+1 {
+		t.Errorf("index = %d, want %d", g, index+1)
+	}
+	if r.pendingConf != true {
+		t.Errorf("pendingConf = %v, want true", r.pendingConf)
+	}
+}
+
+// TestStepIgnoreConfig tests that if raft step the second msgProp in
+// ConfigEntry type when the first one is uncommitted, the node will deny
+// the proposal and keep its original state.
+func TestStepIgnoreConfig(t *testing.T) {
+	// a raft that cannot make progress
+	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	index := r.raftLog.lastIndex()
+	pendingConf := r.pendingConf
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	if g := r.raftLog.lastIndex(); g != index {
+		t.Errorf("index = %d, want %d", g, index)
+	}
+	if r.pendingConf != pendingConf {
+		t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
+	}
+}
+
+// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
+// based on uncommitted entries.
+func TestRecoverPendingConfig(t *testing.T) {
+	tests := []struct {
+		entType  int64
+		wpending bool
+	}{
+		{EntryNormal, false},
+		{EntryConfig, true},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, []int64{1, 2}, 0, 0)
+		r.appendEntry(pb.Entry{Type: tt.entType})
+		r.becomeCandidate()
+		r.becomeLeader()
+		if r.pendingConf != tt.wpending {
+			t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
+		}
+	}
+}
+
+// TestRecoverDoublePendingConfig tests that new leader will panic if
+// there exist two uncommitted config entries.
+func TestRecoverDoublePendingConfig(t *testing.T) {
+	func() {
+		defer func() {
+			if err := recover(); err == nil {
+				t.Errorf("expect panic, but nothing happens")
+			}
+		}()
+		r := newRaft(1, []int64{1, 2}, 0, 0)
+		r.appendEntry(pb.Entry{Type: EntryConfig})
+		r.appendEntry(pb.Entry{Type: EntryConfig})
+		r.becomeCandidate()
+		r.becomeLeader()
+	}()
+}
+
+// TestAddNode tests that addNode could update pendingConf and peer list correctly.
+func TestAddNode(t *testing.T) {
+	r := newRaft(1, []int64{1}, 0, 0)
+	r.pendingConf = true
+	r.addNode(2)
+	if r.pendingConf != false {
+		t.Errorf("pendingConf = %v, want false", r.pendingConf)
+	}
+	nodes := r.nodes()
+	sort.Sort(int64Slice(nodes))
+	wnodes := []int64{1, 2}
+	if !reflect.DeepEqual(nodes, wnodes) {
+		t.Errorf("nodes = %v, want %v", nodes, wnodes)
+	}
+}
+
+// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly.
+func TestRemoveNode(t *testing.T) {
+	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r.pendingConf = true
+	r.removeNode(2)
+	if r.pendingConf != false {
+		t.Errorf("pendingConf = %v, want false", r.pendingConf)
+	}
+	w := []int64{1}
+	if g := r.nodes(); !reflect.DeepEqual(g, w) {
+		t.Errorf("nodes = %v, want %v", g, w)
+	}
+}
+
 func ents(terms ...int64) *raft {
 	ents := []pb.Entry{{}}
 	for _, term := range terms {

+ 20 - 0
raft/raftpb/raft.pb.go

@@ -41,6 +41,7 @@ func (m *Info) String() string { return proto.CompactTextString(m) }
 func (*Info) ProtoMessage()    {}
 
 type Entry struct {
+	Type             int64  `protobuf:"varint,1,req,name=type" json:"type"`
 	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
 	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
 	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data"`
@@ -169,6 +170,21 @@ func (m *Entry) Unmarshal(data []byte) error {
 		fieldNum := int32(wire >> 3)
 		wireType := int(wire & 0x7)
 		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Type |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		case 2:
 			if wireType != 0 {
 				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
@@ -648,6 +664,7 @@ func (m *Info) Size() (n int) {
 func (m *Entry) Size() (n int) {
 	var l int
 	_ = l
+	n += 1 + sovRaft(uint64(m.Type))
 	n += 1 + sovRaft(uint64(m.Term))
 	n += 1 + sovRaft(uint64(m.Index))
 	l = len(m.Data)
@@ -760,6 +777,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) {
 	_ = i
 	var l int
 	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintRaft(data, i, uint64(m.Type))
 	data[i] = 0x10
 	i++
 	i = encodeVarintRaft(data, i, uint64(m.Term))

+ 1 - 0
raft/raftpb/raft.proto

@@ -12,6 +12,7 @@ message Info {
 }
 
 message Entry {
+	required int64 type  = 1 [(gogoproto.nullable) = false];
 	required int64 term  = 2 [(gogoproto.nullable) = false];
 	required int64 index = 3 [(gogoproto.nullable) = false];
 	optional bytes data  = 4 [(gogoproto.nullable) = false];