corrupt.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "github.com/coreos/etcd/clientv3"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "github.com/coreos/etcd/mvcc"
  23. "github.com/coreos/etcd/pkg/types"
  24. )
  25. // CheckInitialHashKV compares initial hash values with its peers
  26. // before serving any peer/client traffic. Only mismatch when hashes
  27. // are different at requested revision, with same compact revision.
  28. func (s *EtcdServer) CheckInitialHashKV() error {
  29. if !s.Cfg.InitialCorruptCheck {
  30. return nil
  31. }
  32. plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
  33. h, rev, crev, err := s.kv.HashByRev(0)
  34. if err != nil {
  35. return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
  36. }
  37. peers := s.getPeerHashKVs(rev)
  38. mismatch := 0
  39. for _, p := range peers {
  40. if p.resp != nil {
  41. peerID := types.ID(p.resp.Header.MemberId)
  42. if h != p.resp.Hash {
  43. if crev == p.resp.CompactRevision {
  44. plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
  45. mismatch++
  46. } else {
  47. plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
  48. }
  49. }
  50. continue
  51. }
  52. if p.err != nil {
  53. switch p.err {
  54. case rpctypes.ErrFutureRev:
  55. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  56. case rpctypes.ErrCompacted:
  57. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  58. }
  59. }
  60. }
  61. if mismatch > 0 {
  62. return fmt.Errorf("%s found data inconsistency with peers", s.ID())
  63. }
  64. plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
  65. return nil
  66. }
  67. func (s *EtcdServer) monitorKVHash() {
  68. t := s.Cfg.CorruptCheckTime
  69. if t == 0 {
  70. return
  71. }
  72. plog.Infof("enabled corruption checking with %s interval", t)
  73. for {
  74. select {
  75. case <-s.stopping:
  76. return
  77. case <-time.After(t):
  78. }
  79. if !s.isLeader() {
  80. continue
  81. }
  82. if err := s.checkHashKV(); err != nil {
  83. plog.Debugf("check hash kv failed %v", err)
  84. }
  85. }
  86. }
  87. func (s *EtcdServer) checkHashKV() error {
  88. h, rev, crev, err := s.kv.HashByRev(0)
  89. if err != nil {
  90. plog.Fatalf("failed to hash kv store (%v)", err)
  91. }
  92. peers := s.getPeerHashKVs(rev)
  93. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  94. err = s.linearizableReadNotify(ctx)
  95. cancel()
  96. if err != nil {
  97. return err
  98. }
  99. h2, rev2, crev2, err := s.kv.HashByRev(0)
  100. if err != nil {
  101. plog.Warningf("failed to hash kv store (%v)", err)
  102. return err
  103. }
  104. alarmed := false
  105. mismatch := func(id uint64) {
  106. if alarmed {
  107. return
  108. }
  109. alarmed = true
  110. a := &pb.AlarmRequest{
  111. MemberID: uint64(id),
  112. Action: pb.AlarmRequest_ACTIVATE,
  113. Alarm: pb.AlarmType_CORRUPT,
  114. }
  115. s.goAttach(func() {
  116. s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
  117. })
  118. }
  119. if h2 != h && rev2 == rev && crev == crev2 {
  120. plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
  121. mismatch(uint64(s.ID()))
  122. }
  123. for _, p := range peers {
  124. if p.resp == nil {
  125. continue
  126. }
  127. id := p.resp.Header.MemberId
  128. // leader expects follower's latest revision less than or equal to leader's
  129. if p.resp.Header.Revision > rev2 {
  130. plog.Warningf(
  131. "revision %d from member %v, expected at most %d",
  132. p.resp.Header.Revision,
  133. types.ID(id),
  134. rev2)
  135. mismatch(id)
  136. }
  137. // leader expects follower's latest compact revision less than or equal to leader's
  138. if p.resp.CompactRevision > crev2 {
  139. plog.Warningf(
  140. "compact revision %d from member %v, expected at most %d",
  141. p.resp.CompactRevision,
  142. types.ID(id),
  143. crev2,
  144. )
  145. mismatch(id)
  146. }
  147. // follower's compact revision is leader's old one, then hashes must match
  148. if p.resp.CompactRevision == crev && p.resp.Hash != h {
  149. plog.Warningf(
  150. "hash %d at revision %d from member %v, expected hash %d",
  151. p.resp.Hash,
  152. rev,
  153. types.ID(id),
  154. h,
  155. )
  156. mismatch(id)
  157. }
  158. }
  159. return nil
  160. }
  161. type peerHashKVResp struct {
  162. resp *clientv3.HashKVResponse
  163. err error
  164. eps []string
  165. }
  166. func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
  167. // TODO: handle the case when "s.cluster.Members" have not
  168. // been populated (e.g. no snapshot to load from disk)
  169. mbs := s.cluster.Members()
  170. pURLs := make([][]string, len(mbs))
  171. for _, m := range mbs {
  172. if m.ID == s.ID() {
  173. continue
  174. }
  175. pURLs = append(pURLs, m.PeerURLs)
  176. }
  177. for _, purls := range pURLs {
  178. if len(purls) == 0 {
  179. continue
  180. }
  181. cli, cerr := clientv3.New(clientv3.Config{
  182. DialTimeout: s.Cfg.ReqTimeout(),
  183. Endpoints: purls,
  184. })
  185. if cerr != nil {
  186. plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
  187. continue
  188. }
  189. respsLen := len(resps)
  190. for _, c := range cli.Endpoints() {
  191. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  192. var resp *clientv3.HashKVResponse
  193. resp, cerr = cli.HashKV(ctx, c, rev)
  194. cancel()
  195. if cerr == nil {
  196. resps = append(resps, &peerHashKVResp{resp: resp})
  197. break
  198. }
  199. plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
  200. }
  201. cli.Close()
  202. if respsLen == len(resps) {
  203. resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
  204. }
  205. }
  206. return resps
  207. }
  208. type applierV3Corrupt struct {
  209. applierV3
  210. }
  211. func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
  212. func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
  213. return nil, ErrCorrupt
  214. }
  215. func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
  216. return nil, ErrCorrupt
  217. }
  218. func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
  219. return nil, ErrCorrupt
  220. }
  221. func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
  222. return nil, ErrCorrupt
  223. }
  224. func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
  225. return nil, nil, ErrCorrupt
  226. }
  227. func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
  228. return nil, ErrCorrupt
  229. }
  230. func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
  231. return nil, ErrCorrupt
  232. }