Browse Source

Merge pull request #8554 from gyuho/initial-hash-checking

*: check data corruption on boot
Gyu-Ho Lee 8 years ago
parent
commit
d84d3f2f77
9 changed files with 260 additions and 26 deletions
  1. 8 4
      e2e/cluster_test.go
  2. 14 0
      e2e/ctl_v3_test.go
  3. 129 0
      e2e/etcd_corrupt_test.go
  4. 3 2
      embed/config.go
  5. 13 0
      embed/etcd.go
  6. 1 0
      etcdmain/config.go
  7. 3 1
      etcdmain/help.go
  8. 4 1
      etcdserver/config.go
  9. 85 18
      etcdserver/corrupt.go

+ 8 - 4
e2e/cluster_test.go

@@ -112,10 +112,11 @@ type etcdProcessClusterConfig struct {
 	isClientAutoTLS       bool
 	isClientCRL           bool
 
-	forceNewCluster   bool
-	initialToken      string
-	quotaBackendBytes int64
-	noStrictReconfig  bool
+	forceNewCluster     bool
+	initialToken        string
+	quotaBackendBytes   int64
+	noStrictReconfig    bool
+	initialCorruptCheck bool
 }
 
 // newEtcdProcessCluster launches a new cluster from etcd processes, returning
@@ -224,6 +225,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 		if cfg.noStrictReconfig {
 			args = append(args, "--strict-reconfig-check=false")
 		}
+		if cfg.initialCorruptCheck {
+			args = append(args, "--experimental-initial-corrupt-check")
+		}
 		var murl string
 		if cfg.metricsURLScheme != "" {
 			murl = (&url.URL{

+ 14 - 0
e2e/ctl_v3_test.go

@@ -55,6 +55,7 @@ type ctlCtx struct {
 	t                 *testing.T
 	cfg               etcdProcessClusterConfig
 	quotaBackendBytes int64
+	corruptFunc       func(string) error
 	noStrictReconfig  bool
 
 	epc *etcdProcessCluster
@@ -69,6 +70,8 @@ type ctlCtx struct {
 	user string
 	pass string
 
+	initialCorruptCheck bool
+
 	// for compaction
 	compactPhysical bool
 }
@@ -105,6 +108,14 @@ func withCompactPhysical() ctlOption {
 	return func(cx *ctlCtx) { cx.compactPhysical = true }
 }
 
+func withInitialCorruptCheck() ctlOption {
+	return func(cx *ctlCtx) { cx.initialCorruptCheck = true }
+}
+
+func withCorruptFunc(f func(string) error) ctlOption {
+	return func(cx *ctlCtx) { cx.corruptFunc = f }
+}
+
 func withNoStrictReconfig() ctlOption {
 	return func(cx *ctlCtx) { cx.noStrictReconfig = true }
 }
@@ -131,6 +142,9 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
 		ret.cfg.quotaBackendBytes = ret.quotaBackendBytes
 	}
 	ret.cfg.noStrictReconfig = ret.noStrictReconfig
+	if ret.initialCorruptCheck {
+		ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
+	}
 
 	epc, err := newEtcdProcessCluster(&ret.cfg)
 	if err != nil {

+ 129 - 0
e2e/etcd_corrupt_test.go

@@ -0,0 +1,129 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+
+	bolt "github.com/coreos/bbolt"
+)
+
+// TODO: test with embedded etcd in integration package
+
+func TestEtcdCorruptHash(t *testing.T) {
+	oldenv := os.Getenv("EXPECT_DEBUG")
+	defer os.Setenv("EXPECT_DEBUG", oldenv)
+	os.Setenv("EXPECT_DEBUG", "1")
+
+	cfg := configNoTLS
+
+	// trigger snapshot so that restart member can load peers from disk
+	cfg.snapCount = 3
+
+	testCtl(t, corruptTest, withQuorum(),
+		withCfg(cfg),
+		withInitialCorruptCheck(),
+		withCorruptFunc(corruptHash),
+	)
+}
+
+func corruptTest(cx ctlCtx) {
+	for i := 0; i < 10; i++ {
+		if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
+			if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
+				cx.t.Fatalf("putTest ctlV3Put error (%v)", err)
+			}
+		}
+	}
+	// enough time for all nodes sync on the same data
+	time.Sleep(3 * time.Second)
+
+	eps := cx.epc.EndpointsV3()
+	cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
+	if err != nil {
+		cx.t.Fatal(err)
+	}
+	defer cli1.Close()
+
+	sresp, err := cli1.Status(context.TODO(), eps[0])
+	if err != nil {
+		cx.t.Fatal(err)
+	}
+	id0 := sresp.Header.GetMemberId()
+
+	cx.epc.procs[0].Stop()
+
+	// corrupt first member by modifying backend offline.
+	fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db")
+	if err = cx.corruptFunc(fp); err != nil {
+		cx.t.Fatal(err)
+	}
+
+	ep := cx.epc.procs[0]
+	proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
+	if err != nil {
+		cx.t.Fatal(err)
+	}
+	defer proc.Stop()
+
+	// restarting corrupted member should fail
+	waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
+}
+
+func corruptHash(fpath string) error {
+	db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{})
+	if derr != nil {
+		return derr
+	}
+	defer db.Close()
+
+	return db.Update(func(tx *bolt.Tx) error {
+		b := tx.Bucket([]byte("key"))
+		if b == nil {
+			return errors.New("got nil bucket for 'key'")
+		}
+		keys, vals := [][]byte{}, [][]byte{}
+		c := b.Cursor()
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			keys = append(keys, k)
+			var kv mvccpb.KeyValue
+			if uerr := kv.Unmarshal(v); uerr != nil {
+				return uerr
+			}
+			kv.Key[0]++
+			kv.Value[0]++
+			v2, v2err := kv.Marshal()
+			if v2err != nil {
+				return v2err
+			}
+			vals = append(vals, v2)
+		}
+		for i := range keys {
+			if perr := b.Put(keys[i], vals[i]); perr != nil {
+				return perr
+			}
+		}
+		return nil
+	})
+}

+ 3 - 2
embed/config.go

@@ -171,8 +171,9 @@ type Config struct {
 
 	// Experimental flags
 
-	ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
-	ExperimentalEnableV2V3       string        `json:"experimental-enable-v2v3"`
+	ExperimentalInitialCorruptCheck bool          `json:"experimental-initial-corrupt-check"`
+	ExperimentalCorruptCheckTime    time.Duration `json:"experimental-corrupt-check-time"`
+	ExperimentalEnableV2V3          string        `json:"experimental-enable-v2v3"`
 }
 
 // configYAML holds the config suitable for yaml parsing

+ 13 - 0
embed/etcd.go

@@ -124,7 +124,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		token   string
 	)
 
+	memberInitialized := true
 	if !isMemberInitialized(cfg) {
+		memberInitialized = false
 		urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
 		if err != nil {
 			return e, fmt.Errorf("error setting up initial cluster: %v", err)
@@ -175,6 +177,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		StrictReconfigCheck:     cfg.StrictReconfigCheck,
 		ClientCertAuthEnabled:   cfg.ClientTLSInfo.ClientCertAuth,
 		AuthToken:               cfg.AuthToken,
+		InitialCorruptCheck:     cfg.ExperimentalInitialCorruptCheck,
 		CorruptCheckTime:        cfg.ExperimentalCorruptCheckTime,
 	}
 
@@ -185,6 +188,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 	// buffer channel so goroutines on closed connections won't wait forever
 	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
 
+	// newly started member ("memberInitialized==false")
+	// does not need corruption check
+	if memberInitialized {
+		if err = e.Server.CheckInitialHashKV(); err != nil {
+			// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
+			// (nothing to close since rafthttp transports have not been started)
+			e.Server = nil
+			return e, err
+		}
+	}
 	e.Server.Start()
 
 	if err = e.servePeers(); err != nil {

+ 1 - 0
etcdmain/config.go

@@ -213,6 +213,7 @@ func newConfig() *config {
 	fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
 
 	// experimental
+	fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
 	fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
 
 	// ignored

+ 3 - 1
etcdmain/help.go

@@ -187,8 +187,10 @@ auth flags:
 		Specify a v3 authentication token type and its options ('simple' or 'jwt').
 
 experimental flags:
+	--experimental-initial-corrupt-check 'false'
+		enable to check data corruption before serving any client/peer traffic.
 	--experimental-corrupt-check-time '0s'
-	        duration of time between cluster corruption check passes.
+		duration of time between cluster corruption check passes.
 	--experimental-enable-v2v3 ''
 		serve v2 requests through the v3 backend under a given prefix.
 `

+ 4 - 1
etcdserver/config.go

@@ -66,7 +66,10 @@ type ServerConfig struct {
 
 	AuthToken string
 
-	CorruptCheckTime time.Duration
+	// InitialCorruptCheck is true to check data corruption on boot
+	// before serving any peer/client traffic.
+	InitialCorruptCheck bool
+	CorruptCheckTime    time.Duration
 }
 
 // VerifyBootstrap sanity-checks the initial config for bootstrap case

+ 85 - 18
etcdserver/corrupt.go

@@ -16,14 +16,61 @@ package etcdserver
 
 import (
 	"context"
+	"fmt"
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/pkg/types"
 )
 
+// CheckInitialHashKV compares initial hash values with its peers
+// before serving any peer/client traffic. Only mismatch when hashes
+// are different at requested revision, with same compact revision.
+func (s *EtcdServer) CheckInitialHashKV() error {
+	if !s.Cfg.InitialCorruptCheck {
+		return nil
+	}
+
+	plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
+	h, rev, crev, err := s.kv.HashByRev(0)
+	if err != nil {
+		return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
+	}
+	peers := s.getPeerHashKVs(rev)
+	mismatch := 0
+	for _, p := range peers {
+		if p.resp != nil {
+			peerID := types.ID(p.resp.Header.MemberId)
+			if h != p.resp.Hash {
+				if crev == p.resp.CompactRevision {
+					plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
+					mismatch++
+				} else {
+					plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
+				}
+			}
+			continue
+		}
+		if p.err != nil {
+			switch p.err {
+			case rpctypes.ErrFutureRev:
+				plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
+			case rpctypes.ErrCompacted:
+				plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
+			}
+		}
+	}
+	if mismatch > 0 {
+		return fmt.Errorf("%s found data inconsistency with peers", s.ID())
+	}
+
+	plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
+	return nil
+}
+
 func (s *EtcdServer) monitorKVHash() {
 	t := s.Cfg.CorruptCheckTime
 	if t == 0 {
@@ -50,7 +97,7 @@ func (s *EtcdServer) checkHashKV() error {
 	if err != nil {
 		plog.Fatalf("failed to hash kv store (%v)", err)
 	}
-	resps := s.getPeerHashKVs(rev)
+	peers := s.getPeerHashKVs(rev)
 
 	ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
 	err = s.linearizableReadNotify(ctx)
@@ -86,24 +133,27 @@ func (s *EtcdServer) checkHashKV() error {
 		mismatch(uint64(s.ID()))
 	}
 
-	for _, resp := range resps {
-		id := resp.Header.MemberId
+	for _, p := range peers {
+		if p.resp == nil {
+			continue
+		}
+		id := p.resp.Header.MemberId
 
 		// leader expects follower's latest revision less than or equal to leader's
-		if resp.Header.Revision > rev2 {
+		if p.resp.Header.Revision > rev2 {
 			plog.Warningf(
 				"revision %d from member %v, expected at most %d",
-				resp.Header.Revision,
+				p.resp.Header.Revision,
 				types.ID(id),
 				rev2)
 			mismatch(id)
 		}
 
 		// leader expects follower's latest compact revision less than or equal to leader's
-		if resp.CompactRevision > crev2 {
+		if p.resp.CompactRevision > crev2 {
 			plog.Warningf(
 				"compact revision %d from member %v, expected at most %d",
-				resp.CompactRevision,
+				p.resp.CompactRevision,
 				types.ID(id),
 				crev2,
 			)
@@ -111,10 +161,10 @@ func (s *EtcdServer) checkHashKV() error {
 		}
 
 		// follower's compact revision is leader's old one, then hashes must match
-		if resp.CompactRevision == crev && resp.Hash != h {
+		if p.resp.CompactRevision == crev && p.resp.Hash != h {
 			plog.Warningf(
 				"hash %d at revision %d from member %v, expected hash %d",
-				resp.Hash,
+				p.resp.Hash,
 				rev,
 				types.ID(id),
 				h,
@@ -125,36 +175,53 @@ func (s *EtcdServer) checkHashKV() error {
 	return nil
 }
 
-func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) {
-	for _, m := range s.cluster.Members() {
+type peerHashKVResp struct {
+	resp *clientv3.HashKVResponse
+	err  error
+	eps  []string
+}
+
+func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
+	// TODO: handle the case when "s.cluster.Members" have not
+	// been populated (e.g. no snapshot to load from disk)
+	mbs := s.cluster.Members()
+	pURLs := make([][]string, len(mbs))
+	for _, m := range mbs {
 		if m.ID == s.ID() {
 			continue
 		}
+		pURLs = append(pURLs, m.PeerURLs)
+	}
 
+	for _, purls := range pURLs {
+		if len(purls) == 0 {
+			continue
+		}
 		cli, cerr := clientv3.New(clientv3.Config{
 			DialTimeout: s.Cfg.ReqTimeout(),
-			Endpoints:   m.PeerURLs,
+			Endpoints:   purls,
 		})
 		if cerr != nil {
-			plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error())
+			plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
 			continue
 		}
 
 		respsLen := len(resps)
 		for _, c := range cli.Endpoints() {
 			ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
-			resp, herr := cli.HashKV(ctx, c, rev)
+			var resp *clientv3.HashKVResponse
+			resp, cerr = cli.HashKV(ctx, c, rev)
 			cancel()
-			if herr == nil {
-				cerr = herr
-				resps = append(resps, resp)
+			if cerr == nil {
+				resps = append(resps, &peerHashKVResp{resp: resp})
 				break
 			}
+			plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
 		}
 		cli.Close()
 
 		if respsLen == len(resps) {
-			plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr)
+			resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
 		}
 	}
 	return resps