v3_alarm_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "os"
  18. "path/filepath"
  19. "sync"
  20. "testing"
  21. "time"
  22. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  24. "go.etcd.io/etcd/mvcc"
  25. "go.etcd.io/etcd/mvcc/backend"
  26. "go.etcd.io/etcd/pkg/testutil"
  27. "go.etcd.io/etcd/pkg/traceutil"
  28. "go.uber.org/zap"
  29. )
  30. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  31. func TestV3StorageQuotaApply(t *testing.T) {
  32. testutil.AfterTest(t)
  33. quotasize := int64(16 * os.Getpagesize())
  34. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  35. defer clus.Terminate(t)
  36. kvc0 := toGRPC(clus.Client(0)).KV
  37. kvc1 := toGRPC(clus.Client(1)).KV
  38. // Set a quota on one node
  39. clus.Members[0].QuotaBackendBytes = quotasize
  40. clus.Members[0].Stop(t)
  41. clus.Members[0].Restart(t)
  42. clus.waitLeader(t, clus.Members)
  43. waitForRestart(t, kvc0)
  44. key := []byte("abc")
  45. // test small put still works
  46. smallbuf := make([]byte, 1024)
  47. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  48. if serr != nil {
  49. t.Fatal(serr)
  50. }
  51. // test big put
  52. bigbuf := make([]byte, quotasize)
  53. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. // quorum get should work regardless of whether alarm is raised
  58. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. // wait until alarm is raised for sure-- poll the alarms
  63. stopc := time.After(5 * time.Second)
  64. for {
  65. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  66. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  67. if aerr != nil {
  68. t.Fatal(aerr)
  69. }
  70. if len(resp.Alarms) != 0 {
  71. break
  72. }
  73. select {
  74. case <-stopc:
  75. t.Fatalf("timed out waiting for alarm")
  76. case <-time.After(10 * time.Millisecond):
  77. }
  78. }
  79. ctx, cancel := context.WithTimeout(context.TODO(), RequestWaitTimeout)
  80. defer cancel()
  81. // small quota machine should reject put
  82. if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  83. t.Fatalf("past-quota instance should reject put")
  84. }
  85. // large quota machine should reject put
  86. if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  87. t.Fatalf("past-quota instance should reject put")
  88. }
  89. // reset large quota node to ensure alarm persisted
  90. clus.Members[1].Stop(t)
  91. clus.Members[1].Restart(t)
  92. clus.waitLeader(t, clus.Members)
  93. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  94. t.Fatalf("alarmed instance should reject put after reset")
  95. }
  96. }
  97. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  98. func TestV3AlarmDeactivate(t *testing.T) {
  99. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  100. defer clus.Terminate(t)
  101. kvc := toGRPC(clus.RandClient()).KV
  102. mt := toGRPC(clus.RandClient()).Maintenance
  103. alarmReq := &pb.AlarmRequest{
  104. MemberID: 123,
  105. Action: pb.AlarmRequest_ACTIVATE,
  106. Alarm: pb.AlarmType_NOSPACE,
  107. }
  108. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  109. t.Fatal(err)
  110. }
  111. key := []byte("abc")
  112. smallbuf := make([]byte, 512)
  113. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  114. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  115. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  116. }
  117. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  118. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  119. t.Fatal(err)
  120. }
  121. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  122. t.Fatal(err)
  123. }
  124. }
  125. type fakeConsistentIndex struct{ rev uint64 }
  126. func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
  127. func TestV3CorruptAlarm(t *testing.T) {
  128. defer testutil.AfterTest(t)
  129. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  130. defer clus.Terminate(t)
  131. var wg sync.WaitGroup
  132. wg.Add(10)
  133. for i := 0; i < 10; i++ {
  134. go func() {
  135. defer wg.Done()
  136. if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
  137. t.Error(err)
  138. }
  139. }()
  140. }
  141. wg.Wait()
  142. // Corrupt member 0 by modifying backend offline.
  143. clus.Members[0].Stop(t)
  144. fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
  145. be := backend.NewDefaultBackend(fp)
  146. s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
  147. // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
  148. s.Put([]byte("abc"), []byte("def"), 0)
  149. s.Put([]byte("xyz"), []byte("123"), 0)
  150. s.Compact(traceutil.TODO(), 5)
  151. s.Commit()
  152. s.Close()
  153. be.Close()
  154. clus.Members[1].WaitOK(t)
  155. clus.Members[2].WaitOK(t)
  156. time.Sleep(time.Second * 2)
  157. // Wait for cluster so Puts succeed in case member 0 was the leader.
  158. if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
  159. t.Fatal(err)
  160. }
  161. if _, err := clus.Client(1).Put(context.TODO(), "xyz", "321"); err != nil {
  162. t.Fatal(err)
  163. }
  164. if _, err := clus.Client(1).Put(context.TODO(), "abc", "fed"); err != nil {
  165. t.Fatal(err)
  166. }
  167. // Restart with corruption checking enabled.
  168. clus.Members[1].Stop(t)
  169. clus.Members[2].Stop(t)
  170. for _, m := range clus.Members {
  171. m.CorruptCheckTime = time.Second
  172. m.Restart(t)
  173. }
  174. clus.WaitLeader(t)
  175. time.Sleep(time.Second * 2)
  176. clus.Members[0].WaitStarted(t)
  177. resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
  178. if err0 != nil {
  179. t.Fatal(err0)
  180. }
  181. clus.Members[1].WaitStarted(t)
  182. resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
  183. if err1 != nil {
  184. t.Fatal(err1)
  185. }
  186. if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
  187. t.Fatalf("matching ModRevision values")
  188. }
  189. for i := 0; i < 5; i++ {
  190. presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
  191. if perr != nil {
  192. if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
  193. t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
  194. } else {
  195. return
  196. }
  197. }
  198. time.Sleep(time.Second)
  199. }
  200. t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
  201. }