corrupt.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "go.etcd.io/etcd/clientv3"
  20. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  22. "go.etcd.io/etcd/mvcc"
  23. "go.etcd.io/etcd/pkg/types"
  24. "go.uber.org/zap"
  25. )
  26. // CheckInitialHashKV compares initial hash values with its peers
  27. // before serving any peer/client traffic. Only mismatch when hashes
  28. // are different at requested revision, with same compact revision.
  29. func (s *EtcdServer) CheckInitialHashKV() error {
  30. if !s.Cfg.InitialCorruptCheck {
  31. return nil
  32. }
  33. lg := s.getLogger()
  34. if lg != nil {
  35. lg.Info(
  36. "starting initial corruption check",
  37. zap.String("local-member-id", s.ID().String()),
  38. zap.Duration("timeout", s.Cfg.ReqTimeout()),
  39. )
  40. } else {
  41. plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
  42. }
  43. h, rev, crev, err := s.kv.HashByRev(0)
  44. if err != nil {
  45. return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
  46. }
  47. peers := s.getPeerHashKVs(rev)
  48. mismatch := 0
  49. for _, p := range peers {
  50. if p.resp != nil {
  51. peerID := types.ID(p.resp.Header.MemberId)
  52. fields := []zap.Field{
  53. zap.String("local-member-id", s.ID().String()),
  54. zap.Int64("local-member-revision", rev),
  55. zap.Int64("local-member-compact-revision", crev),
  56. zap.Uint32("local-member-hash", h),
  57. zap.String("remote-peer-id", peerID.String()),
  58. zap.Strings("remote-peer-endpoints", p.eps),
  59. zap.Int64("remote-peer-revision", p.resp.Header.Revision),
  60. zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision),
  61. zap.Uint32("remote-peer-hash", p.resp.Hash),
  62. }
  63. if h != p.resp.Hash {
  64. if crev == p.resp.CompactRevision {
  65. if lg != nil {
  66. lg.Warn("found different hash values from remote peer", fields...)
  67. } else {
  68. plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
  69. }
  70. mismatch++
  71. } else {
  72. if lg != nil {
  73. lg.Warn("found different compact revision values from remote peer", fields...)
  74. } else {
  75. plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
  76. }
  77. }
  78. }
  79. continue
  80. }
  81. if p.err != nil {
  82. switch p.err {
  83. case rpctypes.ErrFutureRev:
  84. if lg != nil {
  85. lg.Warn(
  86. "cannot fetch hash from slow remote peer",
  87. zap.String("local-member-id", s.ID().String()),
  88. zap.Int64("local-member-revision", rev),
  89. zap.Int64("local-member-compact-revision", crev),
  90. zap.Uint32("local-member-hash", h),
  91. zap.String("remote-peer-id", p.id.String()),
  92. zap.Strings("remote-peer-endpoints", p.eps),
  93. zap.Error(err),
  94. )
  95. } else {
  96. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  97. }
  98. case rpctypes.ErrCompacted:
  99. if lg != nil {
  100. lg.Warn(
  101. "cannot fetch hash from remote peer; local member is behind",
  102. zap.String("local-member-id", s.ID().String()),
  103. zap.Int64("local-member-revision", rev),
  104. zap.Int64("local-member-compact-revision", crev),
  105. zap.Uint32("local-member-hash", h),
  106. zap.String("remote-peer-id", p.id.String()),
  107. zap.Strings("remote-peer-endpoints", p.eps),
  108. zap.Error(err),
  109. )
  110. } else {
  111. plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
  112. }
  113. }
  114. }
  115. }
  116. if mismatch > 0 {
  117. return fmt.Errorf("%s found data inconsistency with peers", s.ID())
  118. }
  119. if lg != nil {
  120. lg.Info(
  121. "initial corruption checking passed; no corruption",
  122. zap.String("local-member-id", s.ID().String()),
  123. )
  124. } else {
  125. plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
  126. }
  127. return nil
  128. }
  129. func (s *EtcdServer) monitorKVHash() {
  130. t := s.Cfg.CorruptCheckTime
  131. if t == 0 {
  132. return
  133. }
  134. lg := s.getLogger()
  135. if lg != nil {
  136. lg.Info(
  137. "enabled corruption checking",
  138. zap.String("local-member-id", s.ID().String()),
  139. zap.Duration("interval", t),
  140. )
  141. } else {
  142. plog.Infof("enabled corruption checking with %s interval", t)
  143. }
  144. for {
  145. select {
  146. case <-s.stopping:
  147. return
  148. case <-time.After(t):
  149. }
  150. if !s.isLeader() {
  151. continue
  152. }
  153. if err := s.checkHashKV(); err != nil {
  154. if lg != nil {
  155. lg.Warn("failed to check hash KV", zap.Error(err))
  156. } else {
  157. plog.Debugf("check hash kv failed %v", err)
  158. }
  159. }
  160. }
  161. }
  162. func (s *EtcdServer) checkHashKV() error {
  163. lg := s.getLogger()
  164. h, rev, crev, err := s.kv.HashByRev(0)
  165. if err != nil {
  166. return err
  167. }
  168. peers := s.getPeerHashKVs(rev)
  169. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  170. err = s.linearizableReadNotify(ctx)
  171. cancel()
  172. if err != nil {
  173. return err
  174. }
  175. h2, rev2, crev2, err := s.kv.HashByRev(0)
  176. if err != nil {
  177. return err
  178. }
  179. alarmed := false
  180. mismatch := func(id uint64) {
  181. if alarmed {
  182. return
  183. }
  184. alarmed = true
  185. a := &pb.AlarmRequest{
  186. MemberID: id,
  187. Action: pb.AlarmRequest_ACTIVATE,
  188. Alarm: pb.AlarmType_CORRUPT,
  189. }
  190. s.goAttach(func() {
  191. s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
  192. })
  193. }
  194. if h2 != h && rev2 == rev && crev == crev2 {
  195. if lg != nil {
  196. lg.Warn(
  197. "found hash mismatch",
  198. zap.Int64("revision-1", rev),
  199. zap.Int64("compact-revision-1", crev),
  200. zap.Uint32("hash-1", h),
  201. zap.Int64("revision-2", rev2),
  202. zap.Int64("compact-revision-2", crev2),
  203. zap.Uint32("hash-2", h2),
  204. )
  205. } else {
  206. plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
  207. }
  208. mismatch(uint64(s.ID()))
  209. }
  210. for _, p := range peers {
  211. if p.resp == nil {
  212. continue
  213. }
  214. id := p.resp.Header.MemberId
  215. // leader expects follower's latest revision less than or equal to leader's
  216. if p.resp.Header.Revision > rev2 {
  217. if lg != nil {
  218. lg.Warn(
  219. "revision from follower must be less than or equal to leader's",
  220. zap.Int64("leader-revision", rev2),
  221. zap.Int64("follower-revision", p.resp.Header.Revision),
  222. zap.String("follower-peer-id", types.ID(id).String()),
  223. )
  224. } else {
  225. plog.Warningf(
  226. "revision %d from member %v, expected at most %d",
  227. p.resp.Header.Revision,
  228. types.ID(id),
  229. rev2)
  230. }
  231. mismatch(id)
  232. }
  233. // leader expects follower's latest compact revision less than or equal to leader's
  234. if p.resp.CompactRevision > crev2 {
  235. if lg != nil {
  236. lg.Warn(
  237. "compact revision from follower must be less than or equal to leader's",
  238. zap.Int64("leader-compact-revision", crev2),
  239. zap.Int64("follower-compact-revision", p.resp.CompactRevision),
  240. zap.String("follower-peer-id", types.ID(id).String()),
  241. )
  242. } else {
  243. plog.Warningf(
  244. "compact revision %d from member %v, expected at most %d",
  245. p.resp.CompactRevision,
  246. types.ID(id),
  247. crev2,
  248. )
  249. }
  250. mismatch(id)
  251. }
  252. // follower's compact revision is leader's old one, then hashes must match
  253. if p.resp.CompactRevision == crev && p.resp.Hash != h {
  254. if lg != nil {
  255. lg.Warn(
  256. "same compact revision then hashes must match",
  257. zap.Int64("leader-compact-revision", crev2),
  258. zap.Uint32("leader-hash", h),
  259. zap.Int64("follower-compact-revision", p.resp.CompactRevision),
  260. zap.Uint32("follower-hash", p.resp.Hash),
  261. zap.String("follower-peer-id", types.ID(id).String()),
  262. )
  263. } else {
  264. plog.Warningf(
  265. "hash %d at revision %d from member %v, expected hash %d",
  266. p.resp.Hash,
  267. rev,
  268. types.ID(id),
  269. h,
  270. )
  271. }
  272. mismatch(id)
  273. }
  274. }
  275. return nil
  276. }
  277. type peerHashKVResp struct {
  278. id types.ID
  279. eps []string
  280. resp *clientv3.HashKVResponse
  281. err error
  282. }
  283. func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
  284. // TODO: handle the case when "s.cluster.Members" have not
  285. // been populated (e.g. no snapshot to load from disk)
  286. mbs := s.cluster.Members()
  287. pss := make([]peerHashKVResp, len(mbs))
  288. for _, m := range mbs {
  289. if m.ID == s.ID() {
  290. continue
  291. }
  292. pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs})
  293. }
  294. lg := s.getLogger()
  295. for _, p := range pss {
  296. if len(p.eps) == 0 {
  297. continue
  298. }
  299. cli, cerr := clientv3.New(clientv3.Config{
  300. DialTimeout: s.Cfg.ReqTimeout(),
  301. Endpoints: p.eps,
  302. })
  303. if cerr != nil {
  304. if lg != nil {
  305. lg.Warn(
  306. "failed to create client to peer URL",
  307. zap.String("local-member-id", s.ID().String()),
  308. zap.String("remote-peer-id", p.id.String()),
  309. zap.Strings("remote-peer-endpoints", p.eps),
  310. zap.Error(cerr),
  311. )
  312. } else {
  313. plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error())
  314. }
  315. continue
  316. }
  317. respsLen := len(resps)
  318. for _, c := range cli.Endpoints() {
  319. ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  320. var resp *clientv3.HashKVResponse
  321. resp, cerr = cli.HashKV(ctx, c, rev)
  322. cancel()
  323. if cerr == nil {
  324. resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil})
  325. break
  326. }
  327. if lg != nil {
  328. lg.Warn(
  329. "failed hash kv request",
  330. zap.String("local-member-id", s.ID().String()),
  331. zap.Int64("requested-revision", rev),
  332. zap.String("remote-peer-endpoint", c),
  333. zap.Error(cerr),
  334. )
  335. } else {
  336. plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
  337. }
  338. }
  339. cli.Close()
  340. if respsLen == len(resps) {
  341. resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr})
  342. }
  343. }
  344. return resps
  345. }
  346. type applierV3Corrupt struct {
  347. applierV3
  348. }
  349. func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
  350. func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
  351. return nil, ErrCorrupt
  352. }
  353. func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
  354. return nil, ErrCorrupt
  355. }
  356. func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
  357. return nil, ErrCorrupt
  358. }
  359. func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
  360. return nil, ErrCorrupt
  361. }
  362. func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
  363. return nil, nil, ErrCorrupt
  364. }
  365. func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
  366. return nil, ErrCorrupt
  367. }
  368. func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
  369. return nil, ErrCorrupt
  370. }