v3_alarm_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "os"
  18. "path/filepath"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/mvcc"
  25. "github.com/coreos/etcd/mvcc/backend"
  26. "github.com/coreos/etcd/pkg/testutil"
  27. "go.uber.org/zap"
  28. )
  29. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  30. func TestV3StorageQuotaApply(t *testing.T) {
  31. testutil.AfterTest(t)
  32. quotasize := int64(16 * os.Getpagesize())
  33. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  34. defer clus.Terminate(t)
  35. kvc0 := toGRPC(clus.Client(0)).KV
  36. kvc1 := toGRPC(clus.Client(1)).KV
  37. // Set a quota on one node
  38. clus.Members[0].QuotaBackendBytes = quotasize
  39. clus.Members[0].Stop(t)
  40. clus.Members[0].Restart(t)
  41. clus.waitLeader(t, clus.Members)
  42. waitForRestart(t, kvc0)
  43. key := []byte("abc")
  44. // test small put still works
  45. smallbuf := make([]byte, 1024)
  46. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  47. if serr != nil {
  48. t.Fatal(serr)
  49. }
  50. // test big put
  51. bigbuf := make([]byte, quotasize)
  52. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. // quorum get should work regardless of whether alarm is raised
  57. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. // wait until alarm is raised for sure-- poll the alarms
  62. stopc := time.After(5 * time.Second)
  63. for {
  64. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  65. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  66. if aerr != nil {
  67. t.Fatal(aerr)
  68. }
  69. if len(resp.Alarms) != 0 {
  70. break
  71. }
  72. select {
  73. case <-stopc:
  74. t.Fatalf("timed out waiting for alarm")
  75. case <-time.After(10 * time.Millisecond):
  76. }
  77. }
  78. // small quota machine should reject put
  79. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  80. t.Fatalf("past-quota instance should reject put")
  81. }
  82. // large quota machine should reject put
  83. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  84. t.Fatalf("past-quota instance should reject put")
  85. }
  86. // reset large quota node to ensure alarm persisted
  87. clus.Members[1].Stop(t)
  88. clus.Members[1].Restart(t)
  89. clus.waitLeader(t, clus.Members)
  90. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  91. t.Fatalf("alarmed instance should reject put after reset")
  92. }
  93. }
  94. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  95. func TestV3AlarmDeactivate(t *testing.T) {
  96. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  97. defer clus.Terminate(t)
  98. kvc := toGRPC(clus.RandClient()).KV
  99. mt := toGRPC(clus.RandClient()).Maintenance
  100. alarmReq := &pb.AlarmRequest{
  101. MemberID: 123,
  102. Action: pb.AlarmRequest_ACTIVATE,
  103. Alarm: pb.AlarmType_NOSPACE,
  104. }
  105. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  106. t.Fatal(err)
  107. }
  108. key := []byte("abc")
  109. smallbuf := make([]byte, 512)
  110. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  111. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  112. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  113. }
  114. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  115. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  116. t.Fatal(err)
  117. }
  118. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  119. t.Fatal(err)
  120. }
  121. }
  122. type fakeConsistentIndex struct{ rev uint64 }
  123. func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
  124. func TestV3CorruptAlarm(t *testing.T) {
  125. defer testutil.AfterTest(t)
  126. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  127. defer clus.Terminate(t)
  128. var wg sync.WaitGroup
  129. wg.Add(10)
  130. for i := 0; i < 10; i++ {
  131. go func() {
  132. defer wg.Done()
  133. if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
  134. t.Fatal(err)
  135. }
  136. }()
  137. }
  138. wg.Wait()
  139. // Corrupt member 0 by modifying backend offline.
  140. clus.Members[0].Stop(t)
  141. fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
  142. be := backend.NewDefaultBackend(fp)
  143. s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13})
  144. // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
  145. s.Put([]byte("abc"), []byte("def"), 0)
  146. s.Put([]byte("xyz"), []byte("123"), 0)
  147. s.Compact(5)
  148. s.Commit()
  149. s.Close()
  150. be.Close()
  151. // Wait for cluster so Puts succeed in case member 0 was the leader.
  152. if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
  153. t.Fatal(err)
  154. }
  155. clus.Client(1).Put(context.TODO(), "xyz", "321")
  156. clus.Client(1).Put(context.TODO(), "abc", "fed")
  157. // Restart with corruption checking enabled.
  158. clus.Members[1].Stop(t)
  159. clus.Members[2].Stop(t)
  160. for _, m := range clus.Members {
  161. m.CorruptCheckTime = time.Second
  162. m.Restart(t)
  163. }
  164. // Member 0 restarts into split brain.
  165. clus.Members[0].WaitStarted(t)
  166. resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
  167. if err0 != nil {
  168. t.Fatal(err0)
  169. }
  170. clus.Members[1].WaitStarted(t)
  171. resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
  172. if err1 != nil {
  173. t.Fatal(err1)
  174. }
  175. if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
  176. t.Fatalf("matching ModRevision values")
  177. }
  178. for i := 0; i < 5; i++ {
  179. presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
  180. if perr != nil {
  181. if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
  182. t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
  183. } else {
  184. return
  185. }
  186. }
  187. time.Sleep(time.Second)
  188. }
  189. t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
  190. }