corrupt.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "go.etcd.io/etcd/clientv3"
  20. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  22. "go.etcd.io/etcd/mvcc"
  23. "go.etcd.io/etcd/pkg/traceutil"
  24. "go.etcd.io/etcd/pkg/types"
  25. "go.uber.org/zap"
  26. )
  27. // CheckInitialHashKV compares initial hash values with its peers
  28. // before serving any peer/client traffic. Only mismatch when hashes
  29. // are different at requested revision, with same compact revision.
  30. func (s *EtcdServer) CheckInitialHashKV() error {
  31. if !s.Cfg.InitialCorruptCheck {
  32. return nil
  33. }
  34. lg := s.getLogger()
  35. if lg != nil {
  36. lg.Info(
  37. "starting initial corruption check",
  38. zap.String("local-member-id", s.ID().String()),
  39. zap.Duration("timeout", s.Cfg.ReqTimeout()),
  40. )
  41. } else {
  42. plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
  43. }
  44. h, rev, crev, err := s.kv.HashByRev(0)
  45. if err != nil {
  46. return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
  47. }
  48. peers := s.getPeerHashKVs(rev)
  49. mismatch := 0
  50. for _, p := range peers {
  51. if p.resp != nil {
  52. peerID := types.ID(p.resp.Header.MemberId)
  53. fields := []zap.Field{
  54. zap.String("local-member-id", s.ID().String()),
  55. zap.Int64("local-member-revision", rev),
  56. zap.Int64("local-member-compact-revision", crev),
  57. zap.Uint32("local-member-hash", h),
  58. zap.String("remote-peer-id", peerID.String()),
  59. zap.Strings("remote-peer-endpoints", p.eps),
  60. zap.Int64("remote-peer-revision", p.resp.Header.Revision),
  61. zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision),
  62. zap.Uint32("remote-peer-hash", p.resp.Hash),
  63. }
  64. if h != p.resp.Hash {
  65. if crev == p.resp.CompactRevision {
  66. if lg != nil {
  67. lg.Warn("found different hash values from remote peer", fields...)
  68. } else {
  69. plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
  70. }
  71. mismatch++
  72. } else {
  73. if lg != nil {
  74. lg.Warn("found different compact revision values from remote peer", fields...)
  75. } else {
  76. plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
  77. }
  78. }
  79. }
  80. continue
  81. }
  82. if p.err != nil {
  83. switch p.err {
  84. case rpctypes.ErrFutureRev:
  85. if lg != nil {
  86. lg.Warn(
  87. "cannot fetch hash from slow remote peer",
  88. zap.String("local-member-id", s.ID().String()),
  89. zap.Int64("local-member-revision", rev),
  90. zap.Int64("local-member-compact-revision", crev),
  91. zap.Uint32("local-member-hash", h),
  92. zap.String("remote-peer-id", p.id.String()),
  93. zap.Strings("remote-peer-endpoints", p.eps),
  94. zap.Error(err),
  95. )
  96. } else {
  97. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  98. }
  99. case rpctypes.ErrCompacted:
  100. if lg != nil {
  101. lg.Warn(
  102. "cannot fetch hash from remote peer; local member is behind",
  103. zap.String("local-member-id", s.ID().String()),
  104. zap.Int64("local-member-revision", rev),
  105. zap.Int64("local-member-compact-revision", crev),
  106. zap.Uint32("local-member-hash", h),
  107. zap.String("remote-peer-id", p.id.String()),
  108. zap.Strings("remote-peer-endpoints", p.eps),
  109. zap.Error(err),
  110. )
  111. } else {
  112. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  113. }
  114. }
  115. }
  116. }
  117. if mismatch > 0 {
  118. return fmt.Errorf("%s found data inconsistency with peers", s.ID())
  119. }
  120. if lg != nil {
  121. lg.Info(
  122. "initial corruption checking passed; no corruption",
  123. zap.String("local-member-id", s.ID().String()),
  124. )
  125. } else {
  126. plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
  127. }
  128. return nil
  129. }
  130. func (s *EtcdServer) monitorKVHash() {
  131. t := s.Cfg.CorruptCheckTime
  132. if t == 0 {
  133. return
  134. }
  135. lg := s.getLogger()
  136. if lg != nil {
  137. lg.Info(
  138. "enabled corruption checking",
  139. zap.String("local-member-id", s.ID().String()),
  140. zap.Duration("interval", t),
  141. )
  142. } else {
  143. plog.Infof("enabled corruption checking with %s interval", t)
  144. }
  145. for {
  146. select {
  147. case <-s.stopping:
  148. return
  149. case <-time.After(t):
  150. }
  151. if !s.isLeader() {
  152. continue
  153. }
  154. if err := s.checkHashKV(); err != nil {
  155. if lg != nil {
  156. lg.Warn("failed to check hash KV", zap.Error(err))
  157. } else {
  158. plog.Debugf("check hash kv failed %v", err)
  159. }
  160. }
  161. }
  162. }
  163. func (s *EtcdServer) checkHashKV() error {
  164. lg := s.getLogger()
  165. h, rev, crev, err := s.kv.HashByRev(0)
  166. if err != nil {
  167. return err
  168. }
  169. peers := s.getPeerHashKVs(rev)
  170. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  171. err = s.linearizableReadNotify(ctx)
  172. cancel()
  173. if err != nil {
  174. return err
  175. }
  176. h2, rev2, crev2, err := s.kv.HashByRev(0)
  177. if err != nil {
  178. return err
  179. }
  180. alarmed := false
  181. mismatch := func(id uint64) {
  182. if alarmed {
  183. return
  184. }
  185. alarmed = true
  186. a := &pb.AlarmRequest{
  187. MemberID: id,
  188. Action: pb.AlarmRequest_ACTIVATE,
  189. Alarm: pb.AlarmType_CORRUPT,
  190. }
  191. s.goAttach(func() {
  192. s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
  193. })
  194. }
  195. if h2 != h && rev2 == rev && crev == crev2 {
  196. if lg != nil {
  197. lg.Warn(
  198. "found hash mismatch",
  199. zap.Int64("revision-1", rev),
  200. zap.Int64("compact-revision-1", crev),
  201. zap.Uint32("hash-1", h),
  202. zap.Int64("revision-2", rev2),
  203. zap.Int64("compact-revision-2", crev2),
  204. zap.Uint32("hash-2", h2),
  205. )
  206. } else {
  207. plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
  208. }
  209. mismatch(uint64(s.ID()))
  210. }
  211. for _, p := range peers {
  212. if p.resp == nil {
  213. continue
  214. }
  215. id := p.resp.Header.MemberId
  216. // leader expects follower's latest revision less than or equal to leader's
  217. if p.resp.Header.Revision > rev2 {
  218. if lg != nil {
  219. lg.Warn(
  220. "revision from follower must be less than or equal to leader's",
  221. zap.Int64("leader-revision", rev2),
  222. zap.Int64("follower-revision", p.resp.Header.Revision),
  223. zap.String("follower-peer-id", types.ID(id).String()),
  224. )
  225. } else {
  226. plog.Warningf(
  227. "revision %d from member %v, expected at most %d",
  228. p.resp.Header.Revision,
  229. types.ID(id),
  230. rev2)
  231. }
  232. mismatch(id)
  233. }
  234. // leader expects follower's latest compact revision less than or equal to leader's
  235. if p.resp.CompactRevision > crev2 {
  236. if lg != nil {
  237. lg.Warn(
  238. "compact revision from follower must be less than or equal to leader's",
  239. zap.Int64("leader-compact-revision", crev2),
  240. zap.Int64("follower-compact-revision", p.resp.CompactRevision),
  241. zap.String("follower-peer-id", types.ID(id).String()),
  242. )
  243. } else {
  244. plog.Warningf(
  245. "compact revision %d from member %v, expected at most %d",
  246. p.resp.CompactRevision,
  247. types.ID(id),
  248. crev2,
  249. )
  250. }
  251. mismatch(id)
  252. }
  253. // follower's compact revision is leader's old one, then hashes must match
  254. if p.resp.CompactRevision == crev && p.resp.Hash != h {
  255. if lg != nil {
  256. lg.Warn(
  257. "same compact revision then hashes must match",
  258. zap.Int64("leader-compact-revision", crev2),
  259. zap.Uint32("leader-hash", h),
  260. zap.Int64("follower-compact-revision", p.resp.CompactRevision),
  261. zap.Uint32("follower-hash", p.resp.Hash),
  262. zap.String("follower-peer-id", types.ID(id).String()),
  263. )
  264. } else {
  265. plog.Warningf(
  266. "hash %d at revision %d from member %v, expected hash %d",
  267. p.resp.Hash,
  268. rev,
  269. types.ID(id),
  270. h,
  271. )
  272. }
  273. mismatch(id)
  274. }
  275. }
  276. return nil
  277. }
  278. type peerHashKVResp struct {
  279. id types.ID
  280. eps []string
  281. resp *clientv3.HashKVResponse
  282. err error
  283. }
  284. func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
  285. // TODO: handle the case when "s.cluster.Members" have not
  286. // been populated (e.g. no snapshot to load from disk)
  287. mbs := s.cluster.Members()
  288. pss := make([]peerHashKVResp, len(mbs))
  289. for _, m := range mbs {
  290. if m.ID == s.ID() {
  291. continue
  292. }
  293. pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs})
  294. }
  295. lg := s.getLogger()
  296. for _, p := range pss {
  297. if len(p.eps) == 0 {
  298. continue
  299. }
  300. cli, cerr := clientv3.New(clientv3.Config{
  301. DialTimeout: s.Cfg.ReqTimeout(),
  302. Endpoints: p.eps,
  303. })
  304. if cerr != nil {
  305. if lg != nil {
  306. lg.Warn(
  307. "failed to create client to peer URL",
  308. zap.String("local-member-id", s.ID().String()),
  309. zap.String("remote-peer-id", p.id.String()),
  310. zap.Strings("remote-peer-endpoints", p.eps),
  311. zap.Error(cerr),
  312. )
  313. } else {
  314. plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error())
  315. }
  316. continue
  317. }
  318. respsLen := len(resps)
  319. for _, c := range cli.Endpoints() {
  320. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  321. var resp *clientv3.HashKVResponse
  322. resp, cerr = cli.HashKV(ctx, c, rev)
  323. cancel()
  324. if cerr == nil {
  325. resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil})
  326. break
  327. }
  328. if lg != nil {
  329. lg.Warn(
  330. "failed hash kv request",
  331. zap.String("local-member-id", s.ID().String()),
  332. zap.Int64("requested-revision", rev),
  333. zap.String("remote-peer-endpoint", c),
  334. zap.Error(cerr),
  335. )
  336. } else {
  337. plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
  338. }
  339. }
  340. cli.Close()
  341. if respsLen == len(resps) {
  342. resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr})
  343. }
  344. }
  345. return resps
  346. }
  347. type applierV3Corrupt struct {
  348. applierV3
  349. }
  350. func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
  351. func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
  352. return nil, nil, ErrCorrupt
  353. }
  354. func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
  355. return nil, ErrCorrupt
  356. }
  357. func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
  358. return nil, ErrCorrupt
  359. }
  360. func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
  361. return nil, ErrCorrupt
  362. }
  363. func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
  364. return nil, nil, nil, ErrCorrupt
  365. }
  366. func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
  367. return nil, ErrCorrupt
  368. }
  369. func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
  370. return nil, ErrCorrupt
  371. }