maintenance_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "path/filepath"
  22. "strings"
  23. "testing"
  24. "time"
  25. "go.uber.org/zap"
  26. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  27. "github.com/coreos/etcd/integration"
  28. "github.com/coreos/etcd/lease"
  29. "github.com/coreos/etcd/mvcc"
  30. "github.com/coreos/etcd/mvcc/backend"
  31. "github.com/coreos/etcd/pkg/testutil"
  32. )
  33. func TestMaintenanceHashKV(t *testing.T) {
  34. defer testutil.AfterTest(t)
  35. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  36. defer clus.Terminate(t)
  37. for i := 0; i < 3; i++ {
  38. if _, err := clus.RandClient().Put(context.Background(), "foo", "bar"); err != nil {
  39. t.Fatal(err)
  40. }
  41. }
  42. var hv uint32
  43. for i := 0; i < 3; i++ {
  44. cli := clus.Client(i)
  45. // ensure writes are replicated
  46. if _, err := cli.Get(context.TODO(), "foo"); err != nil {
  47. t.Fatal(err)
  48. }
  49. hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0)
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. if hv == 0 {
  54. hv = hresp.Hash
  55. continue
  56. }
  57. if hv != hresp.Hash {
  58. t.Fatalf("#%d: hash expected %d, got %d", i, hv, hresp.Hash)
  59. }
  60. }
  61. }
  62. func TestMaintenanceMoveLeader(t *testing.T) {
  63. defer testutil.AfterTest(t)
  64. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  65. defer clus.Terminate(t)
  66. oldLeadIdx := clus.WaitLeader(t)
  67. targetIdx := (oldLeadIdx + 1) % 3
  68. target := uint64(clus.Members[targetIdx].ID())
  69. cli := clus.Client(targetIdx)
  70. _, err := cli.MoveLeader(context.Background(), target)
  71. if err != rpctypes.ErrNotLeader {
  72. t.Fatalf("error expected %v, got %v", rpctypes.ErrNotLeader, err)
  73. }
  74. cli = clus.Client(oldLeadIdx)
  75. _, err = cli.MoveLeader(context.Background(), target)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. leadIdx := clus.WaitLeader(t)
  80. lead := uint64(clus.Members[leadIdx].ID())
  81. if target != lead {
  82. t.Fatalf("new leader expected %d, got %d", target, lead)
  83. }
  84. }
  85. // TestMaintenanceSnapshotError ensures that context cancel/timeout
  86. // before snapshot reading returns corresponding context errors.
  87. func TestMaintenanceSnapshotError(t *testing.T) {
  88. defer testutil.AfterTest(t)
  89. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  90. defer clus.Terminate(t)
  91. // reading snapshot with canceled context should error out
  92. ctx, cancel := context.WithCancel(context.Background())
  93. rc1, err := clus.RandClient().Snapshot(ctx)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. defer rc1.Close()
  98. cancel()
  99. _, err = io.Copy(ioutil.Discard, rc1)
  100. if err != context.Canceled {
  101. t.Errorf("expected %v, got %v", context.Canceled, err)
  102. }
  103. // reading snapshot with deadline exceeded should error out
  104. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  105. defer cancel()
  106. rc2, err := clus.RandClient().Snapshot(ctx)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. defer rc2.Close()
  111. time.Sleep(2 * time.Second)
  112. _, err = io.Copy(ioutil.Discard, rc2)
  113. if err != nil && err != context.DeadlineExceeded {
  114. t.Errorf("expected %v, got %v", context.DeadlineExceeded, err)
  115. }
  116. }
  117. // TestMaintenanceSnapshotErrorInflight ensures that inflight context cancel/timeout
  118. // fails snapshot reading with corresponding context errors.
  119. func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
  120. defer testutil.AfterTest(t)
  121. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  122. defer clus.Terminate(t)
  123. // take about 1-second to read snapshot
  124. clus.Members[0].Stop(t)
  125. dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
  126. b := backend.NewDefaultBackend(dpath)
  127. s := mvcc.NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  128. rev := 100000
  129. for i := 2; i <= rev; i++ {
  130. s.Put([]byte(fmt.Sprintf("%10d", i)), bytes.Repeat([]byte("a"), 1024), lease.NoLease)
  131. }
  132. s.Close()
  133. b.Close()
  134. clus.Members[0].Restart(t)
  135. // reading snapshot with canceled context should error out
  136. ctx, cancel := context.WithCancel(context.Background())
  137. rc1, err := clus.RandClient().Snapshot(ctx)
  138. if err != nil {
  139. t.Fatal(err)
  140. }
  141. defer rc1.Close()
  142. donec := make(chan struct{})
  143. go func() {
  144. time.Sleep(300 * time.Millisecond)
  145. cancel()
  146. close(donec)
  147. }()
  148. _, err = io.Copy(ioutil.Discard, rc1)
  149. if err != nil && err != context.Canceled {
  150. t.Errorf("expected %v, got %v", context.Canceled, err)
  151. }
  152. <-donec
  153. // reading snapshot with deadline exceeded should error out
  154. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  155. defer cancel()
  156. rc2, err := clus.RandClient().Snapshot(ctx)
  157. if err != nil {
  158. t.Fatal(err)
  159. }
  160. defer rc2.Close()
  161. // 300ms left and expect timeout while snapshot reading is in progress
  162. time.Sleep(700 * time.Millisecond)
  163. _, err = io.Copy(ioutil.Discard, rc2)
  164. if err != nil && !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
  165. t.Errorf("expected %v from gRPC, got %v", context.DeadlineExceeded, err)
  166. }
  167. }