Selaa lähdekoodia

Merge pull request #3857 from es-chow/remove-multinode-goroutine

raft: add an thread-unsafe Node: RawNode
Xiang Li 10 vuotta sitten
vanhempi
commit
a423a55b14
6 muutettua tiedostoa jossa 524 lisäystä ja 1076 poistoa
  1. 10 0
      raft/log.go
  2. 33 0
      raft/log_test.go
  3. 0 503
      raft/multinode.go
  4. 0 573
      raft/multinode_test.go
  5. 228 0
      raft/rawnode.go
  6. 253 0
      raft/rawnode_test.go

+ 10 - 0
raft/log.go

@@ -148,6 +148,16 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	return nil
 }
 
+// hasNextEnts returns if there is any available entries for execution. This
+// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
+func (l *raftLog) hasNextEnts() bool {
+	off := max(l.applied+1, l.firstIndex())
+	if l.committed+1 > off {
+		return true
+	}
+	return false
+}
+
 func (l *raftLog) snapshot() (pb.Snapshot, error) {
 	if l.unstable.snapshot != nil {
 		return *l.unstable.snapshot, nil

+ 33 - 0
raft/log_test.go

@@ -338,6 +338,39 @@ func TestCompactionSideEffects(t *testing.T) {
 	}
 }
 
+func TestHasNextEnts(t *testing.T) {
+	snap := pb.Snapshot{
+		Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
+	}
+	ents := []pb.Entry{
+		{Term: 1, Index: 4},
+		{Term: 1, Index: 5},
+		{Term: 1, Index: 6},
+	}
+	tests := []struct {
+		applied uint64
+		hasNext bool
+	}{
+		{0, true},
+		{3, true},
+		{4, true},
+		{5, false},
+	}
+	for i, tt := range tests {
+		storage := NewMemoryStorage()
+		storage.ApplySnapshot(snap)
+		raftLog := newLog(storage, raftLogger)
+		raftLog.append(ents...)
+		raftLog.maybeCommit(5, 1)
+		raftLog.appliedTo(tt.applied)
+
+		hasNext := raftLog.hasNextEnts()
+		if hasNext != tt.hasNext {
+			t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext)
+		}
+	}
+}
+
 func TestNextEnts(t *testing.T) {
 	snap := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},

+ 0 - 503
raft/multinode.go

@@ -1,503 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package raft
-
-import (
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	pb "github.com/coreos/etcd/raft/raftpb"
-)
-
-// MultiNode represents a node that is participating in multiple consensus groups.
-// A MultiNode is more efficient than a collection of Nodes.
-// The methods of this interface correspond to the methods of Node and are described
-// more fully there.
-type MultiNode interface {
-	// CreateGroup adds a new group to the MultiNode. The application must call CreateGroup
-	// on each particpating node with the same group ID; it may create groups on demand as it
-	// receives messages. If the given storage contains existing log entries the list of peers
-	// may be empty. If Config.ID field is zero it will be replaced by the ID passed
-	// to StartMultiNode.
-	CreateGroup(group uint64, c *Config, peers []Peer) error
-	// RemoveGroup removes a group from the MultiNode.
-	RemoveGroup(group uint64) error
-	// Tick advances the internal logical clock by a single tick.
-	Tick()
-	// Campaign causes this MultiNode to transition to candidate state in the given group.
-	Campaign(ctx context.Context, group uint64) error
-	// Propose proposes that data be appended to the given group's log.
-	Propose(ctx context.Context, group uint64, data []byte) error
-	// ProposeConfChange proposes a config change.
-	ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error
-	// ApplyConfChange applies a config change to the local node.
-	ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState
-	// Step advances the state machine using the given message.
-	Step(ctx context.Context, group uint64, msg pb.Message) error
-	// Ready returns a channel that returns the current point-in-time state of any ready
-	// groups. Only groups with something to report will appear in the map.
-	Ready() <-chan map[uint64]Ready
-	// Advance notifies the node that the application has applied and saved progress in the
-	// last Ready results. It must be called with the last value returned from the Ready()
-	// channel.
-	Advance(map[uint64]Ready)
-	// Status returns the current status of the given group. Returns nil if no such group
-	// exists.
-	Status(group uint64) *Status
-	// Report reports the given node is not reachable for the last send.
-	ReportUnreachable(id, groupID uint64)
-	// ReportSnapshot reports the stutus of the sent snapshot.
-	ReportSnapshot(id, groupID uint64, status SnapshotStatus)
-	// Stop performs any necessary termination of the MultiNode.
-	Stop()
-}
-
-// StartMultiNode creates a MultiNode and starts its background
-// goroutine. If id is non-zero it identifies this node and will be
-// used as its node ID in all groups. The election and heartbeat
-// timers are in units of ticks.
-func StartMultiNode(id uint64) MultiNode {
-	mn := newMultiNode(id)
-	go mn.run()
-	return &mn
-}
-
-// TODO(bdarnell): add group ID to the underlying protos?
-type multiMessage struct {
-	group uint64
-	msg   pb.Message
-}
-
-type multiConfChange struct {
-	group uint64
-	msg   pb.ConfChange
-	ch    chan pb.ConfState
-}
-
-type multiStatus struct {
-	group uint64
-	ch    chan *Status
-}
-
-type groupCreation struct {
-	id     uint64
-	config *Config
-	peers  []Peer
-	// TODO(bdarnell): do we really need the done channel here? It's
-	// unlike the rest of this package, but we need the group creation
-	// to be complete before any Propose or other calls.
-	done chan struct{}
-}
-
-type groupRemoval struct {
-	id uint64
-	// TODO(bdarnell): see comment on groupCreation.done
-	done chan struct{}
-}
-
-type multiNode struct {
-	id       uint64
-	groupc   chan groupCreation
-	rmgroupc chan groupRemoval
-	propc    chan multiMessage
-	recvc    chan multiMessage
-	confc    chan multiConfChange
-	readyc   chan map[uint64]Ready
-	advancec chan map[uint64]Ready
-	tickc    chan struct{}
-	stop     chan struct{}
-	done     chan struct{}
-	status   chan multiStatus
-}
-
-func newMultiNode(id uint64) multiNode {
-	return multiNode{
-		id:       id,
-		groupc:   make(chan groupCreation),
-		rmgroupc: make(chan groupRemoval),
-		propc:    make(chan multiMessage),
-		recvc:    make(chan multiMessage),
-		confc:    make(chan multiConfChange),
-		readyc:   make(chan map[uint64]Ready),
-		advancec: make(chan map[uint64]Ready),
-		tickc:    make(chan struct{}),
-		stop:     make(chan struct{}),
-		done:     make(chan struct{}),
-		status:   make(chan multiStatus),
-	}
-}
-
-type groupState struct {
-	id         uint64
-	raft       *raft
-	prevSoftSt *SoftState
-	prevHardSt pb.HardState
-	prevSnapi  uint64
-}
-
-func (g *groupState) newReady() Ready {
-	return newReady(g.raft, g.prevSoftSt, g.prevHardSt)
-}
-
-func (g *groupState) commitReady(rd Ready) {
-	if rd.SoftState != nil {
-		g.prevSoftSt = rd.SoftState
-	}
-	if !IsEmptyHardState(rd.HardState) {
-		g.prevHardSt = rd.HardState
-	}
-	if g.prevHardSt.Commit != 0 {
-		// In most cases, prevHardSt and rd.HardState will be the same
-		// because when there are new entries to apply we just sent a
-		// HardState with an updated Commit value. However, on initial
-		// startup the two are different because we don't send a HardState
-		// until something changes, but we do send any un-applied but
-		// committed entries (and previously-committed entries may be
-		// incorporated into the snapshot, even if rd.CommittedEntries is
-		// empty). Therefore we mark all committed entries as applied
-		// whether they were included in rd.HardState or not.
-		g.raft.raftLog.appliedTo(g.prevHardSt.Commit)
-	}
-	if len(rd.Entries) > 0 {
-		e := rd.Entries[len(rd.Entries)-1]
-		g.raft.raftLog.stableTo(e.Index, e.Term)
-	}
-	if !IsEmptySnap(rd.Snapshot) {
-		g.prevSnapi = rd.Snapshot.Metadata.Index
-		g.raft.raftLog.stableSnapTo(g.prevSnapi)
-	}
-}
-
-func (mn *multiNode) run() {
-	groups := map[uint64]*groupState{}
-	rds := map[uint64]Ready{}
-	var advancec chan map[uint64]Ready
-	for {
-		// Only select readyc if we have something to report and we are not
-		// currently waiting for an advance.
-		readyc := mn.readyc
-		if len(rds) == 0 || advancec != nil {
-			readyc = nil
-		}
-
-		// group points to the group that was touched on this iteration (if any)
-		var group *groupState
-		select {
-		case gc := <-mn.groupc:
-			if (gc.config.ID != mn.id) && (gc.config.ID != 0 && mn.id != 0) {
-				panic("if gc.config.ID and mn.id differ, one of them must be zero")
-			}
-			if gc.config.ID == 0 {
-				gc.config.ID = mn.id
-			}
-			r := newRaft(gc.config)
-			group = &groupState{
-				id:   gc.id,
-				raft: r,
-			}
-			groups[gc.id] = group
-			lastIndex, err := gc.config.Storage.LastIndex()
-			if err != nil {
-				panic(err) // TODO(bdarnell)
-			}
-			// If the log is empty, this is a new group (like StartNode); otherwise it's
-			// restoring an existing group (like RestartNode).
-			// TODO(bdarnell): rethink group initialization and whether the application needs
-			// to be able to tell us when it expects the group to exist.
-			if lastIndex == 0 {
-				r.becomeFollower(1, None)
-				ents := make([]pb.Entry, len(gc.peers))
-				for i, peer := range gc.peers {
-					cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
-					data, err := cc.Marshal()
-					if err != nil {
-						panic("unexpected marshal error")
-					}
-					ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
-				}
-				r.raftLog.append(ents...)
-				r.raftLog.committed = uint64(len(ents))
-				for _, peer := range gc.peers {
-					r.addNode(peer.ID)
-				}
-			}
-			// Set the initial hard and soft states after performing all initialization.
-			group.prevSoftSt = r.softState()
-			group.prevHardSt = r.HardState
-			close(gc.done)
-
-		case gr := <-mn.rmgroupc:
-			delete(groups, gr.id)
-			delete(rds, gr.id)
-			close(gr.done)
-
-		case mm := <-mn.propc:
-			// TODO(bdarnell): single-node impl doesn't read from propc unless the group
-			// has a leader; we can't do that since we have one propc for many groups.
-			// We'll have to buffer somewhere on a group-by-group basis, or just let
-			// raft.Step drop any such proposals on the floor.
-			var ok bool
-			if group, ok = groups[mm.group]; ok {
-				mm.msg.From = group.raft.id
-				group.raft.Step(mm.msg)
-			}
-
-		case mm := <-mn.recvc:
-			group = groups[mm.group]
-			if _, ok := group.raft.prs[mm.msg.From]; ok || !IsResponseMsg(mm.msg) {
-				group.raft.Step(mm.msg)
-			}
-
-		case mcc := <-mn.confc:
-			group = groups[mcc.group]
-			if mcc.msg.NodeID == None {
-				group.raft.resetPendingConf()
-				select {
-				case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
-				case <-mn.done:
-				}
-				break
-			}
-			switch mcc.msg.Type {
-			case pb.ConfChangeAddNode:
-				group.raft.addNode(mcc.msg.NodeID)
-			case pb.ConfChangeRemoveNode:
-				group.raft.removeNode(mcc.msg.NodeID)
-			case pb.ConfChangeUpdateNode:
-				group.raft.resetPendingConf()
-			default:
-				panic("unexpected conf type")
-			}
-			select {
-			case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
-			case <-mn.done:
-			}
-
-		case <-mn.tickc:
-			// TODO(bdarnell): instead of calling every group on every tick,
-			// we should have a priority queue of groups based on their next
-			// time-based event.
-			for _, g := range groups {
-				g.raft.tick()
-				rd := g.newReady()
-				if rd.containsUpdates() {
-					rds[g.id] = rd
-				}
-			}
-
-		case readyc <- rds:
-			// Clear outgoing messages as soon as we've passed them to the application.
-			for g := range rds {
-				groups[g].raft.msgs = nil
-			}
-			rds = map[uint64]Ready{}
-			advancec = mn.advancec
-
-		case advs := <-advancec:
-			for groupID, rd := range advs {
-				g, ok := groups[groupID]
-				if !ok {
-					continue
-				}
-				g.commitReady(rd)
-
-				// We've been accumulating new entries in rds which may now be obsolete.
-				// Drop the old Ready object and create a new one if needed.
-				delete(rds, groupID)
-				newRd := g.newReady()
-				if newRd.containsUpdates() {
-					rds[groupID] = newRd
-				}
-			}
-			advancec = nil
-
-		case ms := <-mn.status:
-			if g, ok := groups[ms.group]; ok {
-				s := getStatus(g.raft)
-				ms.ch <- &s
-			} else {
-				ms.ch <- nil
-			}
-
-		case <-mn.stop:
-			close(mn.done)
-			return
-		}
-
-		if group != nil {
-			rd := group.newReady()
-			if rd.containsUpdates() {
-				rds[group.id] = rd
-			}
-		}
-	}
-}
-
-func (mn *multiNode) CreateGroup(id uint64, config *Config, peers []Peer) error {
-	gc := groupCreation{
-		id:     id,
-		config: config,
-		peers:  peers,
-		done:   make(chan struct{}),
-	}
-	mn.groupc <- gc
-	select {
-	case <-gc.done:
-		return nil
-	case <-mn.done:
-		return ErrStopped
-	}
-}
-
-func (mn *multiNode) RemoveGroup(id uint64) error {
-	gr := groupRemoval{
-		id:   id,
-		done: make(chan struct{}),
-	}
-	mn.rmgroupc <- gr
-	select {
-	case <-gr.done:
-		return nil
-	case <-mn.done:
-		return ErrStopped
-	}
-}
-
-func (mn *multiNode) Stop() {
-	select {
-	case mn.stop <- struct{}{}:
-	case <-mn.done:
-	}
-	<-mn.done
-}
-
-func (mn *multiNode) Tick() {
-	select {
-	case mn.tickc <- struct{}{}:
-	case <-mn.done:
-	}
-}
-
-func (mn *multiNode) Campaign(ctx context.Context, group uint64) error {
-	return mn.step(ctx, multiMessage{group,
-		pb.Message{
-			Type: pb.MsgHup,
-		},
-	})
-}
-
-func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error {
-	return mn.step(ctx, multiMessage{group,
-		pb.Message{
-			Type: pb.MsgProp,
-			Entries: []pb.Entry{
-				{Data: data},
-			},
-		}})
-}
-
-func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error {
-	data, err := cc.Marshal()
-	if err != nil {
-		return err
-	}
-	return mn.Step(ctx, group,
-		pb.Message{
-			Type: pb.MsgProp,
-			Entries: []pb.Entry{
-				{Type: pb.EntryConfChange, Data: data},
-			},
-		})
-}
-
-func (mn *multiNode) step(ctx context.Context, m multiMessage) error {
-	ch := mn.recvc
-	if m.msg.Type == pb.MsgProp {
-		ch = mn.propc
-	}
-
-	select {
-	case ch <- m:
-		return nil
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-mn.done:
-		return ErrStopped
-	}
-}
-
-func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState {
-	mcc := multiConfChange{group, cc, make(chan pb.ConfState)}
-	select {
-	case mn.confc <- mcc:
-	case <-mn.done:
-	}
-	select {
-	case cs := <-mcc.ch:
-		return &cs
-	case <-mn.done:
-		// Per comments on Node.ApplyConfChange, this method should never return nil.
-		return &pb.ConfState{}
-	}
-}
-
-func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error {
-	// ignore unexpected local messages receiving over network
-	if IsLocalMsg(m) {
-		// TODO: return an error?
-		return nil
-	}
-	return mn.step(ctx, multiMessage{group, m})
-}
-
-func (mn *multiNode) Ready() <-chan map[uint64]Ready {
-	return mn.readyc
-}
-
-func (mn *multiNode) Advance(rds map[uint64]Ready) {
-	select {
-	case mn.advancec <- rds:
-	case <-mn.done:
-	}
-}
-
-func (mn *multiNode) Status(group uint64) *Status {
-	ms := multiStatus{
-		group: group,
-		ch:    make(chan *Status),
-	}
-	mn.status <- ms
-	return <-ms.ch
-}
-
-func (mn *multiNode) ReportUnreachable(id, groupID uint64) {
-	select {
-	case mn.recvc <- multiMessage{
-		group: groupID,
-		msg:   pb.Message{Type: pb.MsgUnreachable, From: id},
-	}:
-	case <-mn.done:
-	}
-}
-
-func (mn *multiNode) ReportSnapshot(id, groupID uint64, status SnapshotStatus) {
-	rej := status == SnapshotFailure
-
-	select {
-	case mn.recvc <- multiMessage{
-		group: groupID,
-		msg:   pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej},
-	}:
-	case <-mn.done:
-	}
-}

+ 0 - 573
raft/multinode_test.go

@@ -1,573 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package raft
-
-import (
-	"bytes"
-	"reflect"
-	"testing"
-	"time"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/raft/raftpb"
-)
-
-// TestMultiNodeStep ensures that multiNode.Step sends MsgProp to propc
-// chan and other kinds of messages to recvc chan.
-func TestMultiNodeStep(t *testing.T) {
-	for i, msgn := range raftpb.MessageType_name {
-		mn := &multiNode{
-			propc: make(chan multiMessage, 1),
-			recvc: make(chan multiMessage, 1),
-		}
-		msgt := raftpb.MessageType(i)
-		mn.Step(context.TODO(), 1, raftpb.Message{Type: msgt})
-		// Proposal goes to proc chan. Others go to recvc chan.
-		if msgt == raftpb.MsgProp {
-			select {
-			case <-mn.propc:
-			default:
-				t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
-			}
-		} else {
-			if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus || msgt == raftpb.MsgCheckQuorum {
-				select {
-				case <-mn.recvc:
-					t.Errorf("%d: step should ignore %s", msgt, msgn)
-				default:
-				}
-			} else {
-				select {
-				case <-mn.recvc:
-				default:
-					t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
-				}
-			}
-		}
-	}
-}
-
-// Cancel and Stop should unblock Step()
-func TestMultiNodeStepUnblock(t *testing.T) {
-	// a node without buffer to block step
-	mn := &multiNode{
-		propc: make(chan multiMessage),
-		done:  make(chan struct{}),
-	}
-
-	ctx, cancel := context.WithCancel(context.Background())
-	stopFunc := func() { close(mn.done) }
-
-	tests := []struct {
-		unblock func()
-		werr    error
-	}{
-		{stopFunc, ErrStopped},
-		{cancel, context.Canceled},
-	}
-
-	for i, tt := range tests {
-		errc := make(chan error, 1)
-		go func() {
-			err := mn.Step(ctx, 1, raftpb.Message{Type: raftpb.MsgProp})
-			errc <- err
-		}()
-		tt.unblock()
-		select {
-		case err := <-errc:
-			if err != tt.werr {
-				t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
-			}
-			//clean up side-effect
-			if ctx.Err() != nil {
-				ctx = context.TODO()
-			}
-			select {
-			case <-mn.done:
-				mn.done = make(chan struct{})
-			default:
-			}
-		case <-time.After(time.Millisecond * 100):
-			t.Errorf("#%d: failed to unblock step", i)
-		}
-	}
-}
-
-// TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
-func TestMultiNodePropose(t *testing.T) {
-	mn := newMultiNode(1)
-	go mn.run()
-	s := NewMemoryStorage()
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
-	mn.Campaign(context.TODO(), 1)
-	proposed := false
-	for {
-		rds := <-mn.Ready()
-		rd := rds[1]
-		s.Append(rd.Entries)
-		// Once we are the leader, propose a command.
-		if !proposed && rd.SoftState.Lead == mn.id {
-			mn.Propose(context.TODO(), 1, []byte("somedata"))
-			proposed = true
-		}
-		mn.Advance(rds)
-
-		// Exit when we have three entries: one ConfChange, one no-op for the election,
-		// and our proposed command.
-		lastIndex, err := s.LastIndex()
-		if err != nil {
-			t.Fatal(err)
-		}
-		if lastIndex >= 3 {
-			break
-		}
-	}
-	mn.Stop()
-
-	lastIndex, err := s.LastIndex()
-	if err != nil {
-		t.Fatal(err)
-	}
-	entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if len(entries) != 1 {
-		t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
-	}
-	if !bytes.Equal(entries[0].Data, []byte("somedata")) {
-		t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
-	}
-}
-
-// TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange
-// sends the given configuration proposal to the underlying raft.
-func TestMultiNodeProposeConfig(t *testing.T) {
-	mn := newMultiNode(1)
-	go mn.run()
-	s := NewMemoryStorage()
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
-	mn.Campaign(context.TODO(), 1)
-	proposed := false
-	var lastIndex uint64
-	var ccdata []byte
-	for {
-		rds := <-mn.Ready()
-		rd := rds[1]
-		s.Append(rd.Entries)
-		// change the step function to appendStep until this raft becomes leader
-		if !proposed && rd.SoftState.Lead == mn.id {
-			cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
-			var err error
-			ccdata, err = cc.Marshal()
-			if err != nil {
-				t.Fatal(err)
-			}
-			mn.ProposeConfChange(context.TODO(), 1, cc)
-			proposed = true
-		}
-		mn.Advance(rds)
-
-		var err error
-		lastIndex, err = s.LastIndex()
-		if err != nil {
-			t.Fatal(err)
-		}
-		if lastIndex >= 3 {
-			break
-		}
-	}
-	mn.Stop()
-
-	entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if len(entries) != 1 {
-		t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
-	}
-	if entries[0].Type != raftpb.EntryConfChange {
-		t.Fatalf("type = %v, want %v", entries[0].Type, raftpb.EntryConfChange)
-	}
-	if !bytes.Equal(entries[0].Data, ccdata) {
-		t.Errorf("data = %v, want %v", entries[0].Data, ccdata)
-	}
-}
-
-// TestProposeUnknownGroup ensures that we gracefully handle proposals
-// for groups we don't know about (which can happen on a former leader
-// that has been removed from the group).
-//
-// It is analogous to TestBlockProposal from node_test.go but in
-// MultiNode we cannot block proposals based on individual group
-// leader status.
-func TestProposeUnknownGroup(t *testing.T) {
-	mn := newMultiNode(1)
-	go mn.run()
-	defer mn.Stop()
-
-	// A nil error from Propose() doesn't mean much. In this case the
-	// proposal will be dropped on the floor because we don't know
-	// anything about group 42. This is a very crude test that mainly
-	// guarantees that we don't panic in this case.
-	if err := mn.Propose(context.TODO(), 42, []byte("somedata")); err != nil {
-		t.Errorf("err = %v, want nil", err)
-	}
-}
-
-// TestProposeAfterRemoveLeader ensures that we gracefully handle
-// proposals that are attempted after a leader has been removed from
-// the active configuration, but before that leader has called
-// MultiNode.RemoveGroup.
-func TestProposeAfterRemoveLeader(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	mn := newMultiNode(1)
-	go mn.run()
-	defer mn.Stop()
-
-	storage := NewMemoryStorage()
-	if err := mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage),
-		[]Peer{{ID: 1}}); err != nil {
-		t.Fatal(err)
-	}
-	if err := mn.Campaign(ctx, 1); err != nil {
-		t.Fatal(err)
-	}
-
-	if err := mn.ProposeConfChange(ctx, 1, raftpb.ConfChange{
-		Type:   raftpb.ConfChangeRemoveNode,
-		NodeID: 1,
-	}); err != nil {
-		t.Fatal(err)
-	}
-	gs := <-mn.Ready()
-	g := gs[1]
-	if err := storage.Append(g.Entries); err != nil {
-		t.Fatal(err)
-	}
-	for _, e := range g.CommittedEntries {
-		if e.Type == raftpb.EntryConfChange {
-			var cc raftpb.ConfChange
-			if err := cc.Unmarshal(e.Data); err != nil {
-				t.Fatal(err)
-			}
-			mn.ApplyConfChange(1, cc)
-		}
-	}
-	mn.Advance(gs)
-
-	if err := mn.Propose(ctx, 1, []byte("somedata")); err != nil {
-		t.Errorf("err = %v, want nil", err)
-	}
-}
-
-// TestNodeTick from node_test.go has no equivalent in multiNode because
-// it reaches into the raft object which is not exposed.
-
-// TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped
-// processing, and that it is idempotent
-func TestMultiNodeStop(t *testing.T) {
-	mn := newMultiNode(1)
-	donec := make(chan struct{})
-
-	go func() {
-		mn.run()
-		close(donec)
-	}()
-
-	mn.Tick()
-	mn.Stop()
-
-	select {
-	case <-donec:
-	case <-time.After(time.Second):
-		t.Fatalf("timed out waiting for node to stop!")
-	}
-
-	// Further ticks should have no effect, the node is stopped.
-	// There is no way to verify this in multinode but at least we can test
-	// it doesn't block or panic.
-	mn.Tick()
-	// Subsequent Stops should have no effect.
-	mn.Stop()
-}
-
-// TestMultiNodeStart ensures that a node can be started correctly. The node should
-// start with correct configuration change entries, and can accept and commit
-// proposals.
-func TestMultiNodeStart(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
-	ccdata, err := cc.Marshal()
-	if err != nil {
-		t.Fatalf("unexpected marshal error: %v", err)
-	}
-	wants := []Ready{
-		{
-			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
-			HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
-			Entries: []raftpb.Entry{
-				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
-			},
-			CommittedEntries: []raftpb.Entry{
-				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
-			},
-		},
-		{
-			HardState:        raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
-			Entries:          []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
-			CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
-		},
-	}
-	mn := StartMultiNode(1)
-	storage := NewMemoryStorage()
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
-	mn.Campaign(ctx, 1)
-	gs := <-mn.Ready()
-	g := gs[1]
-	if !reflect.DeepEqual(g, wants[0]) {
-		t.Fatalf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
-	} else {
-		storage.Append(g.Entries)
-		mn.Advance(gs)
-	}
-
-	mn.Propose(ctx, 1, []byte("foo"))
-	if gs2 := <-mn.Ready(); !reflect.DeepEqual(gs2[1], wants[1]) {
-		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, gs2[1], wants[1])
-	} else {
-		storage.Append(gs2[1].Entries)
-		mn.Advance(gs2)
-	}
-
-	select {
-	case rd := <-mn.Ready():
-		t.Errorf("unexpected Ready: %+v", rd)
-	case <-time.After(time.Millisecond):
-	}
-}
-
-func TestMultiNodeRestart(t *testing.T) {
-	entries := []raftpb.Entry{
-		{Term: 1, Index: 1},
-		{Term: 1, Index: 2, Data: []byte("foo")},
-	}
-	st := raftpb.HardState{Term: 1, Commit: 1}
-
-	want := Ready{
-		HardState: emptyState,
-		// commit up to index commit index in st
-		CommittedEntries: entries[:st.Commit],
-	}
-
-	storage := NewMemoryStorage()
-	storage.SetHardState(st)
-	storage.Append(entries)
-	mn := StartMultiNode(1)
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), nil)
-	gs := <-mn.Ready()
-	if !reflect.DeepEqual(gs[1], want) {
-		t.Errorf("g = %+v,\n             w   %+v", gs[1], want)
-	}
-	mn.Advance(gs)
-
-	select {
-	case rd := <-mn.Ready():
-		t.Errorf("unexpected Ready: %+v", rd)
-	case <-time.After(time.Millisecond):
-	}
-	mn.Stop()
-}
-
-func TestMultiNodeRestartFromSnapshot(t *testing.T) {
-	snap := raftpb.Snapshot{
-		Metadata: raftpb.SnapshotMetadata{
-			ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
-			Index:     2,
-			Term:      1,
-		},
-	}
-	entries := []raftpb.Entry{
-		{Term: 1, Index: 3, Data: []byte("foo")},
-	}
-	st := raftpb.HardState{Term: 1, Commit: 3}
-
-	want := Ready{
-		HardState: emptyState,
-		// commit up to index commit index in st
-		CommittedEntries: entries,
-	}
-
-	s := NewMemoryStorage()
-	s.SetHardState(st)
-	s.ApplySnapshot(snap)
-	s.Append(entries)
-	mn := StartMultiNode(1)
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), nil)
-	if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) {
-		t.Errorf("g = %+v,\n             w   %+v", gs[1], want)
-	} else {
-		mn.Advance(gs)
-	}
-
-	select {
-	case rd := <-mn.Ready():
-		t.Errorf("unexpected Ready: %+v", rd)
-	case <-time.After(time.Millisecond):
-	}
-}
-
-func TestMultiNodeAdvance(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	storage := NewMemoryStorage()
-	mn := StartMultiNode(1)
-	mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
-	mn.Campaign(ctx, 1)
-	rd1 := <-mn.Ready()
-	mn.Propose(ctx, 1, []byte("foo"))
-	select {
-	case rd2 := <-mn.Ready():
-		t.Fatalf("unexpected Ready before Advance: %+v", rd2)
-	case <-time.After(time.Millisecond):
-	}
-	storage.Append(rd1[1].Entries)
-	mn.Advance(rd1)
-	select {
-	case <-mn.Ready():
-	case <-time.After(100 * time.Millisecond):
-		t.Errorf("expect Ready after Advance, but there is no Ready available")
-	}
-}
-
-func TestMultiNodeStatus(t *testing.T) {
-	storage := NewMemoryStorage()
-	mn := StartMultiNode(1)
-	err := mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
-	if err != nil {
-		t.Fatal(err)
-	}
-	status := mn.Status(1)
-	if status == nil {
-		t.Errorf("expected status struct, got nil")
-	}
-
-	status = mn.Status(2)
-	if status != nil {
-		t.Errorf("expected nil status, got %+v", status)
-	}
-}
-
-// TestMultiNodePerGroupID tests that MultiNode may have a different
-// node ID for each group, if and only if the Config.ID field is
-// filled in when calling CreateGroup.
-func TestMultiNodePerGroupID(t *testing.T) {
-	storage := NewMemoryStorage()
-	mn := StartMultiNode(0)
-
-	// Maps group ID to node ID.
-	groups := map[uint64]uint64{
-		1: 10,
-		2: 20,
-	}
-
-	// Create two groups.
-	for g, nodeID := range groups {
-		err := mn.CreateGroup(g, newTestConfig(nodeID, nil, 10, 1, storage),
-			[]Peer{{ID: nodeID}, {ID: nodeID + 1}, {ID: nodeID + 2}})
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	// Campaign on both groups.
-	for g := range groups {
-		err := mn.Campaign(context.Background(), g)
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	// All outgoing messages (two MsgVotes for each group) should have
-	// the correct From IDs.
-	var rd map[uint64]Ready
-	select {
-	case rd = <-mn.Ready():
-	case <-time.After(100 * time.Millisecond):
-		t.Fatal("timed out waiting for ready")
-	}
-	for g, nodeID := range groups {
-		if len(rd[g].Messages) != 2 {
-			t.Errorf("expected 2 messages in group %d; got %d", g, len(rd[g].Messages))
-		}
-
-		for _, m := range rd[g].Messages {
-			if m.From != nodeID {
-				t.Errorf("expected %s message in group %d to have From: %d; got %d",
-					m.Type, g, nodeID, m.From)
-			}
-		}
-	}
-	mn.Advance(rd)
-
-	// Become a follower in both groups.
-	for g, nodeID := range groups {
-		err := mn.Step(context.Background(), g, raftpb.Message{
-			Type: raftpb.MsgHeartbeat,
-			To:   nodeID,
-			From: nodeID + 1,
-		})
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	// Propose a command on each group (Propose is tested separately
-	// because proposals in follower mode go through a different code path).
-	for g := range groups {
-		err := mn.Propose(context.Background(), g, []byte("foo"))
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	// Validate that all outgoing messages (heartbeat response and
-	// proposal) have the correct From IDs.
-	select {
-	case rd = <-mn.Ready():
-	case <-time.After(100 * time.Millisecond):
-		t.Fatal("timed out waiting for ready")
-	}
-	for g, nodeID := range groups {
-		if len(rd[g].Messages) != 2 {
-			t.Errorf("expected 2 messages in group %d; got %d", g, len(rd[g].Messages))
-		}
-
-		for _, m := range rd[g].Messages {
-			if m.From != nodeID {
-				t.Errorf("expected %s message in group %d to have From: %d; got %d",
-					m.Type, g, nodeID, m.From)
-			}
-		}
-	}
-	mn.Advance(rd)
-}

+ 228 - 0
raft/rawnode.go

@@ -0,0 +1,228 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+	"errors"
+
+	pb "github.com/coreos/etcd/raft/raftpb"
+)
+
+// ErrStepLocalMsg is returned when try to step a local raft message
+var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
+
+// ErrStepPeerNotFound is returned when try to step a response message
+// but there is no peer found in raft.prs for that node.
+var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
+
+// RawNode is a thread-unsafe Node.
+// The methods of this struct correspond to the methods of Node and are described
+// more fully there.
+type RawNode struct {
+	raft       *raft
+	prevSoftSt *SoftState
+	prevHardSt pb.HardState
+}
+
+func (rn *RawNode) newReady() Ready {
+	return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
+}
+
+func (rn *RawNode) commitReady(rd Ready) {
+	if rd.SoftState != nil {
+		rn.prevSoftSt = rd.SoftState
+	}
+	if !IsEmptyHardState(rd.HardState) {
+		rn.prevHardSt = rd.HardState
+	}
+	if rn.prevHardSt.Commit != 0 {
+		// In most cases, prevHardSt and rd.HardState will be the same
+		// because when there are new entries to apply we just sent a
+		// HardState with an updated Commit value. However, on initial
+		// startup the two are different because we don't send a HardState
+		// until something changes, but we do send any un-applied but
+		// committed entries (and previously-committed entries may be
+		// incorporated into the snapshot, even if rd.CommittedEntries is
+		// empty). Therefore we mark all committed entries as applied
+		// whether they were included in rd.HardState or not.
+		rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)
+	}
+	if len(rd.Entries) > 0 {
+		e := rd.Entries[len(rd.Entries)-1]
+		rn.raft.raftLog.stableTo(e.Index, e.Term)
+	}
+	if !IsEmptySnap(rd.Snapshot) {
+		rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+	}
+}
+
+// NewRawNode returns a new RawNode given configuration and a list of raft peers.
+func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
+	if config.ID == 0 {
+		panic("config.ID must not be zero")
+	}
+	r := newRaft(config)
+	rn := &RawNode{
+		raft: r,
+	}
+	lastIndex, err := config.Storage.LastIndex()
+	if err != nil {
+		panic(err) // TODO(bdarnell)
+	}
+	// If the log is empty, this is a new RawNode (like StartNode); otherwise it's
+	// restoring an existing RawNode (like RestartNode).
+	// TODO(bdarnell): rethink RawNode initialization and whether the application needs
+	// to be able to tell us when it expects the RawNode to exist.
+	if lastIndex == 0 {
+		r.becomeFollower(1, None)
+		ents := make([]pb.Entry, len(peers))
+		for i, peer := range peers {
+			cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
+			data, err := cc.Marshal()
+			if err != nil {
+				panic("unexpected marshal error")
+			}
+
+			ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
+		}
+		r.raftLog.append(ents...)
+		r.raftLog.committed = uint64(len(ents))
+		for _, peer := range peers {
+			r.addNode(peer.ID)
+		}
+	}
+	// Set the initial hard and soft states after performing all initialization.
+	rn.prevSoftSt = r.softState()
+	rn.prevHardSt = r.HardState
+
+	return rn, nil
+}
+
+// Tick advances the internal logical clock by a single tick.
+func (rn *RawNode) Tick() {
+	rn.raft.tick()
+}
+
+// Campaign causes this RawNode to transition to candidate state.
+func (rn *RawNode) Campaign() error {
+	return rn.raft.Step(pb.Message{
+		Type: pb.MsgHup,
+	})
+}
+
+// Propose proposes data be appended to the raft log.
+func (rn *RawNode) Propose(data []byte) error {
+	return rn.raft.Step(pb.Message{
+		Type: pb.MsgProp,
+		From: rn.raft.id,
+		Entries: []pb.Entry{
+			{Data: data},
+		}})
+}
+
+// ProposeConfChange proposes a config change.
+func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
+	data, err := cc.Marshal()
+	if err != nil {
+		return err
+	}
+	return rn.raft.Step(pb.Message{
+		Type: pb.MsgProp,
+		Entries: []pb.Entry{
+			{Type: pb.EntryConfChange, Data: data},
+		},
+	})
+}
+
+// ApplyConfChange applies a config change to the local node.
+func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+	if cc.NodeID == None {
+		rn.raft.resetPendingConf()
+		return &pb.ConfState{Nodes: rn.raft.nodes()}
+	}
+	switch cc.Type {
+	case pb.ConfChangeAddNode:
+		rn.raft.addNode(cc.NodeID)
+	case pb.ConfChangeRemoveNode:
+		rn.raft.removeNode(cc.NodeID)
+	case pb.ConfChangeUpdateNode:
+		rn.raft.resetPendingConf()
+	default:
+		panic("unexpected conf type")
+	}
+	return &pb.ConfState{Nodes: rn.raft.nodes()}
+}
+
+// Step advances the state machine using the given message.
+func (rn *RawNode) Step(m pb.Message) error {
+	// ignore unexpected local messages receiving over network
+	if IsLocalMsg(m) {
+		return ErrStepLocalMsg
+	}
+	if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m) {
+		return rn.raft.Step(m)
+	}
+	return ErrStepPeerNotFound
+}
+
+// Ready returns the current point-in-time state of this RawNode.
+func (rn *RawNode) Ready() Ready {
+	rd := rn.newReady()
+	rn.raft.msgs = nil
+	return rd
+}
+
+// HasReady called when RawNode user need to check if any Ready pending.
+// Checking logic in this method should be consistent with Ready.containsUpdates().
+func (rn *RawNode) HasReady() bool {
+	r := rn.raft
+	if !r.softState().equal(rn.prevSoftSt) {
+		return true
+	}
+	if !IsEmptyHardState(r.HardState) && !isHardStateEqual(r.HardState, rn.prevHardSt) {
+		return true
+	}
+	if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
+		return true
+	}
+	if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
+		return true
+	}
+	return false
+}
+
+// Advance notifies the RawNode that the application has applied and saved progress in the
+// last Ready results.
+func (rn *RawNode) Advance(rd Ready) {
+	rn.commitReady(rd)
+}
+
+// Status returns the current status of the given group.
+func (rn *RawNode) Status() *Status {
+	status := getStatus(rn.raft)
+	return &status
+}
+
+// ReportUnreachable reports the given node is not reachable for the last send.
+func (rn *RawNode) ReportUnreachable(id uint64) {
+	_ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
+}
+
+// ReportSnapshot reports the stutus of the sent snapshot.
+func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
+	rej := status == SnapshotFailure
+
+	_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
+}

+ 253 - 0
raft/rawnode_test.go

@@ -0,0 +1,253 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writinrawNode, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+// TestRawNodeStep ensures that RawNode.Step ignore local message.
+func TestRawNodeStep(t *testing.T) {
+	for i, msgn := range raftpb.MessageType_name {
+		s := NewMemoryStorage()
+		rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
+		if err != nil {
+			t.Fatal(err)
+		}
+		msgt := raftpb.MessageType(i)
+		err = rawNode.Step(raftpb.Message{Type: msgt})
+		// LocalMsg should be ignored.
+		if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus {
+			if err != ErrStepLocalMsg {
+				t.Errorf("%d: step should ignore %s", msgt, msgn)
+			}
+		}
+	}
+}
+
+// TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
+// no goroutine in RawNode.
+
+// TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange
+// send the given proposal and ConfChangeto the underlying raft.
+func TestRawNodeProposeAndConfChange(t *testing.T) {
+	s := NewMemoryStorage()
+	var err error
+	rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rawNode.Campaign()
+	proposed := false
+	var lastIndex uint64
+	var ccdata []byte
+	for {
+		rd := rawNode.Ready()
+		s.Append(rd.Entries)
+		// Once we are the leader, propose a command and a ConfChange.
+		if !proposed && rd.SoftState.Lead == rawNode.raft.id {
+			rawNode.Propose([]byte("somedata"))
+
+			cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
+			ccdata, err = cc.Marshal()
+			if err != nil {
+				t.Fatal(err)
+			}
+			rawNode.ProposeConfChange(cc)
+
+			proposed = true
+		}
+		rawNode.Advance(rd)
+
+		// Exit when we have four entries: one ConfChange, one no-op for the election,
+		// our proposed command and proposed ConfChange.
+		lastIndex, err = s.LastIndex()
+		if err != nil {
+			t.Fatal(err)
+		}
+		if lastIndex >= 4 {
+			break
+		}
+	}
+
+	entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(entries) != 2 {
+		t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
+	}
+	if !bytes.Equal(entries[0].Data, []byte("somedata")) {
+		t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
+	}
+	if entries[1].Type != raftpb.EntryConfChange {
+		t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange)
+	}
+	if !bytes.Equal(entries[1].Data, ccdata) {
+		t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
+	}
+}
+
+// TestBlockProposal from node_test.go has no equivalent in rawNode because there is
+// no leader check in RawNode.
+
+// TestNodeTick from node_test.go has no equivalent in rawNode because
+// it reaches into the raft object which is not exposed.
+
+// TestNodeStop from node_test.go has no equivalent in rawNode because there is
+// no goroutine in RawNode.
+
+// TestRawNodeStart ensures that a node can be started correctly. The node should
+// start with correct configuration change entries, and can accept and commit
+// proposals.
+func TestRawNodeStart(t *testing.T) {
+	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
+	ccdata, err := cc.Marshal()
+	if err != nil {
+		t.Fatalf("unexpected marshal error: %v", err)
+	}
+	wants := []Ready{
+		{
+			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
+			HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
+			Entries: []raftpb.Entry{
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 2, Index: 2},
+			},
+			CommittedEntries: []raftpb.Entry{
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 2, Index: 2},
+			},
+		},
+		{
+			HardState:        raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
+			Entries:          []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
+			CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
+		},
+	}
+
+	storage := NewMemoryStorage()
+	rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rawNode.Campaign()
+	rd := rawNode.Ready()
+	t.Logf("rd %v", rd)
+	if !reflect.DeepEqual(rd, wants[0]) {
+		t.Fatalf("#%d: g = %+v,\n             w   %+v", 1, rd, wants[0])
+	} else {
+		storage.Append(rd.Entries)
+		rawNode.Advance(rd)
+	}
+
+	rawNode.Propose([]byte("foo"))
+	if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {
+		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, rd, wants[1])
+	} else {
+		storage.Append(rd.Entries)
+		rawNode.Advance(rd)
+	}
+
+	if rawNode.HasReady() {
+		t.Errorf("unexpected Ready: %+v", rawNode.Ready())
+	}
+}
+
+func TestRawNodeRestart(t *testing.T) {
+	entries := []raftpb.Entry{
+		{Term: 1, Index: 1},
+		{Term: 1, Index: 2, Data: []byte("foo")},
+	}
+	st := raftpb.HardState{Term: 1, Commit: 1}
+
+	want := Ready{
+		HardState: emptyState,
+		// commit up to commit index in st
+		CommittedEntries: entries[:st.Commit],
+	}
+
+	storage := NewMemoryStorage()
+	storage.SetHardState(st)
+	storage.Append(entries)
+	rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rd := rawNode.Ready()
+	if !reflect.DeepEqual(rd, want) {
+		t.Errorf("g = %+v,\n             w   %+v", rd, want)
+	}
+	rawNode.Advance(rd)
+	if rawNode.HasReady() {
+		t.Errorf("unexpected Ready: %+v", rawNode.Ready())
+	}
+}
+
+func TestRawNodeRestartFromSnapshot(t *testing.T) {
+	snap := raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{
+			ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
+			Index:     2,
+			Term:      1,
+		},
+	}
+	entries := []raftpb.Entry{
+		{Term: 1, Index: 3, Data: []byte("foo")},
+	}
+	st := raftpb.HardState{Term: 1, Commit: 3}
+
+	want := Ready{
+		HardState: emptyState,
+		// commit up to commit index in st
+		CommittedEntries: entries,
+	}
+
+	s := NewMemoryStorage()
+	s.SetHardState(st)
+	s.ApplySnapshot(snap)
+	s.Append(entries)
+	rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
+		t.Errorf("g = %+v,\n             w   %+v", rd, want)
+	} else {
+		rawNode.Advance(rd)
+	}
+	if rawNode.HasReady() {
+		t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
+	}
+}
+
+// TestNodeAdvance from node_test.go has no equivalent in rawNode because there is
+// no dependency check between Ready() and Advance()
+
+func TestRawNodeStatus(t *testing.T) {
+	storage := NewMemoryStorage()
+	rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	status := rawNode.Status()
+	if status == nil {
+		t.Errorf("expected status struct, got nil")
+	}
+}