v3_alarm_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "os"
  18. "path/filepath"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/mvcc"
  25. "github.com/coreos/etcd/mvcc/backend"
  26. "github.com/coreos/etcd/pkg/testutil"
  27. )
  28. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  29. func TestV3StorageQuotaApply(t *testing.T) {
  30. testutil.AfterTest(t)
  31. quotasize := int64(16 * os.Getpagesize())
  32. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  33. defer clus.Terminate(t)
  34. kvc0 := toGRPC(clus.Client(0)).KV
  35. kvc1 := toGRPC(clus.Client(1)).KV
  36. // Set a quota on one node
  37. clus.Members[0].QuotaBackendBytes = quotasize
  38. clus.Members[0].Stop(t)
  39. clus.Members[0].Restart(t)
  40. clus.waitLeader(t, clus.Members)
  41. waitForRestart(t, kvc0)
  42. key := []byte("abc")
  43. // test small put still works
  44. smallbuf := make([]byte, 1024)
  45. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  46. if serr != nil {
  47. t.Fatal(serr)
  48. }
  49. // test big put
  50. bigbuf := make([]byte, quotasize)
  51. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  52. if err != nil {
  53. t.Fatal(err)
  54. }
  55. // quorum get should work regardless of whether alarm is raised
  56. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. // wait until alarm is raised for sure-- poll the alarms
  61. stopc := time.After(5 * time.Second)
  62. for {
  63. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  64. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  65. if aerr != nil {
  66. t.Fatal(aerr)
  67. }
  68. if len(resp.Alarms) != 0 {
  69. break
  70. }
  71. select {
  72. case <-stopc:
  73. t.Fatalf("timed out waiting for alarm")
  74. case <-time.After(10 * time.Millisecond):
  75. }
  76. }
  77. // small quota machine should reject put
  78. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  79. t.Fatalf("past-quota instance should reject put")
  80. }
  81. // large quota machine should reject put
  82. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  83. t.Fatalf("past-quota instance should reject put")
  84. }
  85. // reset large quota node to ensure alarm persisted
  86. clus.Members[1].Stop(t)
  87. clus.Members[1].Restart(t)
  88. clus.waitLeader(t, clus.Members)
  89. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  90. t.Fatalf("alarmed instance should reject put after reset")
  91. }
  92. }
  93. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  94. func TestV3AlarmDeactivate(t *testing.T) {
  95. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  96. defer clus.Terminate(t)
  97. kvc := toGRPC(clus.RandClient()).KV
  98. mt := toGRPC(clus.RandClient()).Maintenance
  99. alarmReq := &pb.AlarmRequest{
  100. MemberID: 123,
  101. Action: pb.AlarmRequest_ACTIVATE,
  102. Alarm: pb.AlarmType_NOSPACE,
  103. }
  104. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  105. t.Fatal(err)
  106. }
  107. key := []byte("abc")
  108. smallbuf := make([]byte, 512)
  109. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  110. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  111. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  112. }
  113. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  114. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  115. t.Fatal(err)
  116. }
  117. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  118. t.Fatal(err)
  119. }
  120. }
  121. type fakeConsistentIndex struct{ rev uint64 }
  122. func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
  123. func TestV3CorruptAlarm(t *testing.T) {
  124. defer testutil.AfterTest(t)
  125. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  126. defer clus.Terminate(t)
  127. var wg sync.WaitGroup
  128. wg.Add(10)
  129. for i := 0; i < 10; i++ {
  130. go func() {
  131. defer wg.Done()
  132. if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
  133. t.Fatal(err)
  134. }
  135. }()
  136. }
  137. wg.Wait()
  138. // Corrupt member 0 by modifying backend offline.
  139. clus.Members[0].Stop(t)
  140. fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
  141. be := backend.NewDefaultBackend(fp)
  142. s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13})
  143. // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
  144. s.Put([]byte("abc"), []byte("def"), 0)
  145. s.Put([]byte("xyz"), []byte("123"), 0)
  146. s.Compact(5)
  147. s.Commit()
  148. s.Close()
  149. be.Close()
  150. // Wait for cluster so Puts succeed in case member 0 was the leader.
  151. if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
  152. t.Fatal(err)
  153. }
  154. clus.Client(1).Put(context.TODO(), "xyz", "321")
  155. clus.Client(1).Put(context.TODO(), "abc", "fed")
  156. // Restart with corruption checking enabled.
  157. clus.Members[1].Stop(t)
  158. clus.Members[2].Stop(t)
  159. for _, m := range clus.Members {
  160. m.CorruptCheckTime = time.Second
  161. m.Restart(t)
  162. }
  163. // Member 0 restarts into split brain.
  164. resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
  165. if err0 != nil {
  166. t.Fatal(err0)
  167. }
  168. resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
  169. if err1 != nil {
  170. t.Fatal(err1)
  171. }
  172. if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
  173. t.Fatalf("matching ModRevision values")
  174. }
  175. for i := 0; i < 5; i++ {
  176. presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
  177. if perr != nil {
  178. if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
  179. t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
  180. } else {
  181. return
  182. }
  183. }
  184. time.Sleep(time.Second)
  185. }
  186. t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
  187. }