Browse Source

Merge pull request #11003 from tbg/interaction/restore

raft: fix restoring joint configurations
Tobias Grieger 6 years ago
parent
commit
4cec8dddc6

+ 1 - 4
raft/confchange/confchange.go

@@ -62,10 +62,7 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker
 		return c.err(err)
 	}
 	// Clear the outgoing config.
-	{
-		*outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
-
-	}
+	*outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
 	// Copy incoming to outgoing.
 	for id := range incoming(cfg.Voters) {
 		outgoing(cfg.Voters)[id] = struct{}{}

+ 155 - 0
raft/confchange/restore.go

@@ -0,0 +1,155 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
+)
+
+// toConfChangeSingle translates a conf state into 1) a slice of operations creating
+// first the config that will become the outgoing one, and then the incoming one, and
+// b) another slice that, when applied to the config resulted from 1), represents the
+// ConfState.
+func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.ConfChangeSingle) {
+	// Example to follow along this code:
+	// voters=(1 2 3) learners=(5) outgoing=(1 2 4 6) learners_next=(4)
+	//
+	// This means that before entering the joint config, the configuration
+	// had voters (1 2 4) and perhaps some learners that are already gone.
+	// The new set of voters is (1 2 3), i.e. (1 2) were kept around, and (4 6)
+	// are no longer voters; however 4 is poised to become a learner upon leaving
+	// the joint state.
+	// We can't tell whether 5 was a learner before entering the joint config,
+	// but it doesn't matter (we'll pretend that it wasn't).
+	//
+	// The code below will construct
+	// outgoing = add 1; add 2; add 4; add 6
+	// incoming = remove 1; remove 2; remove 4; remove 6
+	//            add 1;    add 2;    add 3;
+	//            add-learner 5;
+	//            add-learner 4;
+	//
+	// So, when starting with an empty config, after applying 'outgoing' we have
+	//
+	//   quorum=(1 2 4 6)
+	//
+	// From which we enter a joint state via 'incoming'
+	//
+	//   quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4)
+	//
+	// as desired.
+
+	for _, id := range cs.VotersOutgoing {
+		// If there are outgoing voters, first add them one by one so that the
+		// (non-joint) config has them all.
+		out = append(out, pb.ConfChangeSingle{
+			Type:   pb.ConfChangeAddNode,
+			NodeID: id,
+		})
+
+	}
+
+	// We're done constructing the outgoing slice, now on to the incoming one
+	// (which will apply on top of the config created by the outgoing slice).
+
+	// First, we'll remove all of the outgoing voters.
+	for _, id := range cs.VotersOutgoing {
+		in = append(in, pb.ConfChangeSingle{
+			Type:   pb.ConfChangeRemoveNode,
+			NodeID: id,
+		})
+	}
+	// Then we'll add the incoming voters and learners.
+	for _, id := range cs.Voters {
+		in = append(in, pb.ConfChangeSingle{
+			Type:   pb.ConfChangeAddNode,
+			NodeID: id,
+		})
+	}
+	for _, id := range cs.Learners {
+		in = append(in, pb.ConfChangeSingle{
+			Type:   pb.ConfChangeAddLearnerNode,
+			NodeID: id,
+		})
+	}
+	// Same for LearnersNext; these are nodes we want to be learners but which
+	// are currently voters in the outgoing config.
+	for _, id := range cs.LearnersNext {
+		in = append(in, pb.ConfChangeSingle{
+			Type:   pb.ConfChangeAddLearnerNode,
+			NodeID: id,
+		})
+	}
+	return out, in
+}
+
+func chain(chg Changer, ops ...func(Changer) (tracker.Config, tracker.ProgressMap, error)) (tracker.Config, tracker.ProgressMap, error) {
+	for _, op := range ops {
+		cfg, prs, err := op(chg)
+		if err != nil {
+			return tracker.Config{}, nil, err
+		}
+		chg.Tracker.Config = cfg
+		chg.Tracker.Progress = prs
+	}
+	return chg.Tracker.Config, chg.Tracker.Progress, nil
+}
+
+// Restore takes a Changer (which must represent an empty configuration), and
+// runs a sequence of changes enacting the configuration described in the
+// ConfState.
+//
+// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure
+// the Changer only needs a ProgressMap (not a whole Tracker) at which point
+// this can just take LastIndex and MaxInflight directly instead and cook up
+// the results from that alone.
+func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap, error) {
+	outgoing, incoming := toConfChangeSingle(cs)
+
+	var ops []func(Changer) (tracker.Config, tracker.ProgressMap, error)
+
+	if len(outgoing) == 0 {
+		// No outgoing config, so just apply the incoming changes one by one.
+		for _, cc := range incoming {
+			cc := cc // loop-local copy
+			ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+				return chg.Simple(cc)
+			})
+		}
+	} else {
+		// The ConfState describes a joint configuration.
+		//
+		// First, apply all of the changes of the outgoing config one by one, so
+		// that it temporarily becomes the incoming active config. For example,
+		// if the config is (1 2 3)&(2 3 4), this will establish (2 3 4)&().
+		for _, cc := range outgoing {
+			cc := cc // loop-local copy
+			ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+				return chg.Simple(cc)
+			})
+		}
+		// Now enter the joint state, which rotates the above additions into the
+		// outgoing config, and adds the incoming config in. Continuing the
+		// example above, we'd get (1 2 3)&(2 3 4), i.e. the incoming operations
+		// would be removing 2,3,4 and then adding in 1,2,3 while transitioning
+		// into a joint state.
+		ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+			return chg.EnterJoint(cs.AutoLeave, incoming...)
+		})
+	}
+
+	return chain(chg, ops...)
+}

+ 142 - 0
raft/confchange/restore_test.go

@@ -0,0 +1,142 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+	"math/rand"
+	"reflect"
+	"sort"
+	"testing"
+	"testing/quick"
+
+	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
+)
+
+type rndConfChange pb.ConfState
+
+// Generate creates a random (valid) ConfState for use with quickcheck.
+func (rndConfChange) Generate(rand *rand.Rand, _ int) reflect.Value {
+	conv := func(sl []int) []uint64 {
+		// We want IDs but the incoming slice is zero-indexed, so add one to
+		// each.
+		out := make([]uint64, len(sl))
+		for i := range sl {
+			out[i] = uint64(sl[i] + 1)
+		}
+		return out
+	}
+	var cs pb.ConfState
+	// NB: never generate the empty ConfState, that one should be unit tested.
+	nVoters := 1 + rand.Intn(5)
+
+	nLearners := rand.Intn(5)
+	// The number of voters that are in the outgoing config but not in the
+	// incoming one. (We'll additionally retain a random number of the
+	// incoming voters below).
+	nRemovedVoters := rand.Intn(3)
+
+	// Voters, learners, and removed voters must not overlap. A "removed voter"
+	// is one that we have in the outgoing config but not the incoming one.
+	ids := conv(rand.Perm(2 * (nVoters + nLearners + nRemovedVoters)))
+
+	cs.Voters = ids[:nVoters]
+	ids = ids[nVoters:]
+
+	if nLearners > 0 {
+		cs.Learners = ids[:nLearners]
+		ids = ids[nLearners:]
+	}
+
+	// Roll the dice on how many of the incoming voters we decide were also
+	// previously voters.
+	//
+	// NB: this code avoids creating non-nil empty slices (here and below).
+	nOutgoingRetainedVoters := rand.Intn(nVoters + 1)
+	if nOutgoingRetainedVoters > 0 || nRemovedVoters > 0 {
+		cs.VotersOutgoing = append([]uint64(nil), cs.Voters[:nOutgoingRetainedVoters]...)
+		cs.VotersOutgoing = append(cs.VotersOutgoing, ids[:nRemovedVoters]...)
+	}
+	// Only outgoing voters that are not also incoming voters can be in
+	// LearnersNext (they represent demotions).
+	if nRemovedVoters > 0 {
+		if nLearnersNext := rand.Intn(nRemovedVoters + 1); nLearnersNext > 0 {
+			cs.LearnersNext = ids[:nLearnersNext]
+		}
+	}
+
+	cs.AutoLeave = len(cs.VotersOutgoing) > 0 && rand.Intn(2) == 1
+	return reflect.ValueOf(rndConfChange(cs))
+}
+
+func TestRestore(t *testing.T) {
+	cfg := quick.Config{MaxCount: 1000}
+
+	f := func(cs pb.ConfState) bool {
+		chg := Changer{
+			Tracker:   tracker.MakeProgressTracker(20),
+			LastIndex: 10,
+		}
+		cfg, prs, err := Restore(chg, cs)
+		if err != nil {
+			t.Error(err)
+			return false
+		}
+		chg.Tracker.Config = cfg
+		chg.Tracker.Progress = prs
+
+		for _, sl := range [][]uint64{
+			cs.Voters,
+			cs.Learners,
+			cs.VotersOutgoing,
+			cs.LearnersNext,
+		} {
+			sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
+		}
+
+		cs2 := chg.Tracker.ConfState()
+		// NB: cs.Equivalent does the same "sorting" dance internally, but let's
+		// test it a bit here instead of relying on it.
+		if reflect.DeepEqual(cs, cs2) && cs.Equivalent(cs2) == nil && cs2.Equivalent(cs) == nil {
+			return true // success
+		}
+		t.Errorf(`
+before: %+#v
+after:  %+#v`, cs, cs2)
+		return false
+	}
+
+	ids := func(sl ...uint64) []uint64 {
+		return sl
+	}
+
+	// Unit tests.
+	for _, cs := range []pb.ConfState{
+		{},
+		{Voters: ids(1, 2, 3)},
+		{Voters: ids(1, 2, 3), Learners: ids(4, 5, 6)},
+		{Voters: ids(1, 2, 3), Learners: ids(5), VotersOutgoing: ids(1, 2, 4, 6), LearnersNext: ids(4)},
+	} {
+		if !f(cs) {
+			t.FailNow() // f() already logged a nice t.Error()
+		}
+	}
+
+	if err := quick.Check(func(cs rndConfChange) bool {
+		return f(pb.ConfState(cs))
+	}, &cfg); err != nil {
+		t.Error(err)
+	}
+}

+ 38 - 28
raft/raft.go

@@ -263,7 +263,8 @@ type raft struct {
 
 	maxMsgSize         uint64
 	maxUncommittedSize uint64
-	prs                tracker.ProgressTracker
+	// TODO(tbg): rename to trk.
+	prs tracker.ProgressTracker
 
 	state StateType
 
@@ -327,18 +328,18 @@ func newRaft(c *Config) *raft {
 	if err != nil {
 		panic(err) // TODO(bdarnell)
 	}
-	peers := c.peers
-	learners := c.learners
-	if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
-		if len(peers) > 0 || len(learners) > 0 {
+
+	if len(c.peers) > 0 || len(c.learners) > 0 {
+		if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
 			// TODO(bdarnell): the peers argument is always nil except in
 			// tests; the argument should be removed and these tests should be
 			// updated to specify their nodes through a snapshot.
 			panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
 		}
-		peers = cs.Voters
-		learners = cs.Learners
+		cs.Voters = c.peers
+		cs.Learners = c.learners
 	}
+
 	r := &raft{
 		id:                        c.ID,
 		lead:                      None,
@@ -355,14 +356,15 @@ func newRaft(c *Config) *raft {
 		readOnly:                  newReadOnly(c.ReadOnlyOption),
 		disableProposalForwarding: c.DisableProposalForwarding,
 	}
-	for _, p := range peers {
-		// Add node to active config.
-		r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}.AsV2())
-	}
-	for _, p := range learners {
-		// Add learner to active config.
-		r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}.AsV2())
+
+	cfg, prs, err := confchange.Restore(confchange.Changer{
+		Tracker:   r.prs,
+		LastIndex: raftlog.lastIndex(),
+	}, cs)
+	if err != nil {
+		panic(err)
 	}
+	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
 
 	if !isHardStateEqual(hs, emptyState) {
 		r.loadState(hs)
@@ -1430,13 +1432,19 @@ func (r *raft) restore(s pb.Snapshot) bool {
 
 	// Reset the configuration and add the (potentially updated) peers in anew.
 	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
-	for _, id := range s.Metadata.ConfState.Voters {
-		r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}.AsV2())
-	}
-	for _, id := range s.Metadata.ConfState.Learners {
-		r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}.AsV2())
+	cfg, prs, err := confchange.Restore(confchange.Changer{
+		Tracker:   r.prs,
+		LastIndex: r.raftLog.lastIndex(),
+	}, cs)
+
+	if err != nil {
+		// This should never happen. Either there's a bug in our config change
+		// handling or the client corrupted the conf change.
+		panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
 	}
 
+	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
 	pr := r.prs.Progress[r.id]
 	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
 
@@ -1471,19 +1479,21 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
 		panic(err)
 	}
 
+	return r.switchToConfig(cfg, prs)
+}
+
+// switchToConfig reconfigures this node to use the provided configuration. It
+// updates the in-memory state and, when necessary, carries out additional
+// actions such as reacting to the removal of nodes or changed quorum
+// requirements.
+//
+// The inputs usually result from restoring a ConfState or applying a ConfChange.
+func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
 	r.prs.Config = cfg
 	r.prs.Progress = prs
 
 	r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
-	// Now that the configuration is updated, handle any side effects.
-
-	cs := pb.ConfState{
-		Voters:         r.prs.Voters[0].Slice(),
-		VotersOutgoing: r.prs.Voters[1].Slice(),
-		Learners:       quorum.MajorityConfig(r.prs.Learners).Slice(),
-		LearnersNext:   quorum.MajorityConfig(r.prs.LearnersNext).Slice(),
-		AutoLeave:      r.prs.AutoLeave,
-	}
+	cs := r.prs.ConfState()
 	pr, ok := r.prs.Progress[r.id]
 
 	// Update whether the node itself is a learner, resetting to false when the

+ 45 - 0
raft/raftpb/confstate.go

@@ -0,0 +1,45 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+)
+
+// Equivalent returns a nil error if the inputs describe the same configuration.
+// On mismatch, returns a descriptive error showing the differences.
+func (cs ConfState) Equivalent(cs2 ConfState) error {
+	cs1 := cs
+	orig1, orig2 := cs1, cs2
+	s := func(sl *[]uint64) {
+		*sl = append([]uint64(nil), *sl...)
+		sort.Slice(*sl, func(i, j int) bool { return (*sl)[i] < (*sl)[j] })
+	}
+
+	for _, cs := range []*ConfState{&cs1, &cs2} {
+		s(&cs.Voters)
+		s(&cs.Learners)
+		s(&cs.VotersOutgoing)
+		s(&cs.LearnersNext)
+		cs.XXX_unrecognized = nil
+	}
+
+	if !reflect.DeepEqual(cs1, cs2) {
+		return fmt.Errorf("ConfStates not equivalent after sorting:\n%+#v\n%+#v\nInputs were:\n%+#v\n%+#v", cs1, cs2, orig1, orig2)
+	}
+	return nil
+}

+ 58 - 0
raft/raftpb/confstate_test.go

@@ -0,0 +1,58 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+	"testing"
+)
+
+func TestConfState_Equivalent(t *testing.T) {
+	type testCase struct {
+		cs, cs2 ConfState
+		ok      bool
+	}
+
+	testCases := []testCase{
+		// Reordered voters and learners.
+		{ConfState{
+			Voters:         []uint64{1, 2, 3},
+			Learners:       []uint64{5, 4, 6},
+			VotersOutgoing: []uint64{9, 8, 7},
+			LearnersNext:   []uint64{10, 20, 15},
+		}, ConfState{
+			Voters:         []uint64{1, 2, 3},
+			Learners:       []uint64{4, 5, 6},
+			VotersOutgoing: []uint64{7, 9, 8},
+			LearnersNext:   []uint64{20, 10, 15},
+		}, true},
+		// Not sensitive to nil vs empty slice.
+		{ConfState{Voters: []uint64{}}, ConfState{Voters: []uint64(nil)}, true},
+		// Non-equivalent voters.
+		{ConfState{Voters: []uint64{1, 2, 3, 4}}, ConfState{Voters: []uint64{2, 1, 3}}, false},
+		{ConfState{Voters: []uint64{1, 4, 3}}, ConfState{Voters: []uint64{2, 1, 3}}, false},
+		// Non-equivalent learners.
+		{ConfState{Voters: []uint64{1, 2, 3, 4}}, ConfState{Voters: []uint64{2, 1, 3}}, false},
+		// Sensitive to AutoLeave flag.
+		{ConfState{AutoLeave: true}, ConfState{}, false},
+	}
+
+	for _, tc := range testCases {
+		t.Run("", func(t *testing.T) {
+			if err := tc.cs.Equivalent(tc.cs2); (err == nil) != tc.ok {
+				t.Fatalf("wanted error: %t, got:\n%s", tc.ok, err)
+			}
+		})
+	}
+}

+ 12 - 0
raft/tracker/tracker.go

@@ -20,6 +20,7 @@ import (
 	"strings"
 
 	"go.etcd.io/etcd/raft/quorum"
+	pb "go.etcd.io/etcd/raft/raftpb"
 )
 
 // Config reflects the configuration tracked in a ProgressTracker.
@@ -141,6 +142,17 @@ func MakeProgressTracker(maxInflight int) ProgressTracker {
 	return p
 }
 
+// ConfState returns a ConfState representing the active configuration.
+func (p *ProgressTracker) ConfState() pb.ConfState {
+	return pb.ConfState{
+		Voters:         p.Voters[0].Slice(),
+		VotersOutgoing: p.Voters[1].Slice(),
+		Learners:       quorum.MajorityConfig(p.Learners).Slice(),
+		LearnersNext:   quorum.MajorityConfig(p.LearnersNext).Slice(),
+		AutoLeave:      p.AutoLeave,
+	}
+}
+
 // IsSingleton returns true if (and only if) there is only one voting member
 // (i.e. the leader) in the current configuration.
 func (p *ProgressTracker) IsSingleton() bool {

+ 8 - 0
raft/util.go

@@ -133,3 +133,11 @@ func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
 	}
 	return ents[:limit]
 }
+
+func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) {
+	err := cs1.Equivalent(cs2)
+	if err == nil {
+		return
+	}
+	l.Panic(err)
+}