v3_alarm_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "os"
  18. "path/filepath"
  19. "sync"
  20. "testing"
  21. "time"
  22. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  24. "go.etcd.io/etcd/mvcc"
  25. "go.etcd.io/etcd/mvcc/backend"
  26. "go.etcd.io/etcd/pkg/testutil"
  27. "go.uber.org/zap"
  28. )
  29. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  30. func TestV3StorageQuotaApply(t *testing.T) {
  31. testutil.AfterTest(t)
  32. quotasize := int64(16 * os.Getpagesize())
  33. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  34. defer clus.Terminate(t)
  35. kvc0 := toGRPC(clus.Client(0)).KV
  36. kvc1 := toGRPC(clus.Client(1)).KV
  37. // Set a quota on one node
  38. clus.Members[0].QuotaBackendBytes = quotasize
  39. clus.Members[0].Stop(t)
  40. clus.Members[0].Restart(t)
  41. clus.waitLeader(t, clus.Members)
  42. waitForRestart(t, kvc0)
  43. key := []byte("abc")
  44. // test small put still works
  45. smallbuf := make([]byte, 1024)
  46. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  47. if serr != nil {
  48. t.Fatal(serr)
  49. }
  50. // test big put
  51. bigbuf := make([]byte, quotasize)
  52. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. // quorum get should work regardless of whether alarm is raised
  57. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. // wait until alarm is raised for sure-- poll the alarms
  62. stopc := time.After(5 * time.Second)
  63. for {
  64. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  65. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  66. if aerr != nil {
  67. t.Fatal(aerr)
  68. }
  69. if len(resp.Alarms) != 0 {
  70. break
  71. }
  72. select {
  73. case <-stopc:
  74. t.Fatalf("timed out waiting for alarm")
  75. case <-time.After(10 * time.Millisecond):
  76. }
  77. }
  78. ctx, cancel := context.WithTimeout(context.TODO(), RequestWaitTimeout)
  79. defer cancel()
  80. // small quota machine should reject put
  81. if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  82. t.Fatalf("past-quota instance should reject put")
  83. }
  84. // large quota machine should reject put
  85. if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  86. t.Fatalf("past-quota instance should reject put")
  87. }
  88. // reset large quota node to ensure alarm persisted
  89. clus.Members[1].Stop(t)
  90. clus.Members[1].Restart(t)
  91. clus.waitLeader(t, clus.Members)
  92. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  93. t.Fatalf("alarmed instance should reject put after reset")
  94. }
  95. }
  96. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  97. func TestV3AlarmDeactivate(t *testing.T) {
  98. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  99. defer clus.Terminate(t)
  100. kvc := toGRPC(clus.RandClient()).KV
  101. mt := toGRPC(clus.RandClient()).Maintenance
  102. alarmReq := &pb.AlarmRequest{
  103. MemberID: 123,
  104. Action: pb.AlarmRequest_ACTIVATE,
  105. Alarm: pb.AlarmType_NOSPACE,
  106. }
  107. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  108. t.Fatal(err)
  109. }
  110. key := []byte("abc")
  111. smallbuf := make([]byte, 512)
  112. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  113. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  114. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  115. }
  116. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  117. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  118. t.Fatal(err)
  119. }
  120. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  121. t.Fatal(err)
  122. }
  123. }
  124. type fakeConsistentIndex struct{ rev uint64 }
  125. func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
  126. func TestV3CorruptAlarm(t *testing.T) {
  127. defer testutil.AfterTest(t)
  128. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  129. defer clus.Terminate(t)
  130. var wg sync.WaitGroup
  131. wg.Add(10)
  132. for i := 0; i < 10; i++ {
  133. go func() {
  134. defer wg.Done()
  135. if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
  136. t.Error(err)
  137. }
  138. }()
  139. }
  140. wg.Wait()
  141. // Corrupt member 0 by modifying backend offline.
  142. clus.Members[0].Stop(t)
  143. fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
  144. be := backend.NewDefaultBackend(fp)
  145. s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
  146. // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
  147. s.Put([]byte("abc"), []byte("def"), 0)
  148. s.Put([]byte("xyz"), []byte("123"), 0)
  149. s.Compact(5)
  150. s.Commit()
  151. s.Close()
  152. be.Close()
  153. clus.Members[1].WaitOK(t)
  154. clus.Members[2].WaitOK(t)
  155. time.Sleep(time.Second * 2)
  156. // Wait for cluster so Puts succeed in case member 0 was the leader.
  157. if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
  158. t.Fatal(err)
  159. }
  160. if _, err := clus.Client(1).Put(context.TODO(), "xyz", "321"); err != nil {
  161. t.Fatal(err)
  162. }
  163. if _, err := clus.Client(1).Put(context.TODO(), "abc", "fed"); err != nil {
  164. t.Fatal(err)
  165. }
  166. // Restart with corruption checking enabled.
  167. clus.Members[1].Stop(t)
  168. clus.Members[2].Stop(t)
  169. for _, m := range clus.Members {
  170. m.CorruptCheckTime = time.Second
  171. m.Restart(t)
  172. }
  173. clus.WaitLeader(t)
  174. time.Sleep(time.Second * 2)
  175. clus.Members[0].WaitStarted(t)
  176. resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
  177. if err0 != nil {
  178. t.Fatal(err0)
  179. }
  180. clus.Members[1].WaitStarted(t)
  181. resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
  182. if err1 != nil {
  183. t.Fatal(err1)
  184. }
  185. if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
  186. t.Fatalf("matching ModRevision values")
  187. }
  188. for i := 0; i < 5; i++ {
  189. presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
  190. if perr != nil {
  191. if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
  192. t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
  193. } else {
  194. return
  195. }
  196. }
  197. time.Sleep(time.Second)
  198. }
  199. t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
  200. }