server_shutdown_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "context"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  24. "github.com/coreos/etcd/integration"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/status"
  28. "google.golang.org/grpc/transport"
  29. )
  30. // TestBalancerUnderServerShutdownWatch expects that watch client
  31. // switch its endpoints when the member of the pinned endpoint fails.
  32. func TestBalancerUnderServerShutdownWatch(t *testing.T) {
  33. defer testutil.AfterTest(t)
  34. clus := integration.NewClusterV3(t, &integration.ClusterConfig{
  35. Size: 3,
  36. SkipCreatingClient: true,
  37. })
  38. defer clus.Terminate(t)
  39. eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
  40. lead := clus.WaitLeader(t)
  41. // pin eps[lead]
  42. watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[lead]}})
  43. if err != nil {
  44. t.Fatal(err)
  45. }
  46. defer watchCli.Close()
  47. // wait for eps[lead] to be pinned
  48. mustWaitPinReady(t, watchCli)
  49. // add all eps to list, so that when the original pined one fails
  50. // the client can switch to other available eps
  51. watchCli.SetEndpoints(eps...)
  52. key, val := "foo", "bar"
  53. wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
  54. select {
  55. case <-wch:
  56. case <-time.After(integration.RequestWaitTimeout):
  57. t.Fatal("took too long to create watch")
  58. }
  59. donec := make(chan struct{})
  60. go func() {
  61. defer close(donec)
  62. // switch to others when eps[lead] is shut down
  63. select {
  64. case ev := <-wch:
  65. if werr := ev.Err(); werr != nil {
  66. t.Fatal(werr)
  67. }
  68. if len(ev.Events) != 1 {
  69. t.Fatalf("expected one event, got %+v", ev)
  70. }
  71. if !bytes.Equal(ev.Events[0].Kv.Value, []byte(val)) {
  72. t.Fatalf("expected %q, got %+v", val, ev.Events[0].Kv)
  73. }
  74. case <-time.After(7 * time.Second):
  75. t.Fatal("took too long to receive events")
  76. }
  77. }()
  78. // shut down eps[lead]
  79. clus.Members[lead].Terminate(t)
  80. // writes to eps[lead+1]
  81. putCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[(lead+1)%3]}})
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. defer putCli.Close()
  86. for {
  87. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  88. _, err = putCli.Put(ctx, key, val)
  89. cancel()
  90. if err == nil {
  91. break
  92. }
  93. if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout || err == rpctypes.ErrTimeoutDueToLeaderFail {
  94. continue
  95. }
  96. t.Fatal(err)
  97. }
  98. select {
  99. case <-donec:
  100. case <-time.After(5 * time.Second): // enough time for balancer switch
  101. t.Fatal("took too long to receive events")
  102. }
  103. }
  104. func TestBalancerUnderServerShutdownPut(t *testing.T) {
  105. testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
  106. _, err := cli.Put(ctx, "foo", "bar")
  107. return err
  108. })
  109. }
  110. func TestBalancerUnderServerShutdownDelete(t *testing.T) {
  111. testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
  112. _, err := cli.Delete(ctx, "foo")
  113. return err
  114. })
  115. }
  116. func TestBalancerUnderServerShutdownTxn(t *testing.T) {
  117. testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
  118. _, err := cli.Txn(ctx).
  119. If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
  120. Then(clientv3.OpPut("foo", "bar")).
  121. Else(clientv3.OpPut("foo", "baz")).Commit()
  122. return err
  123. })
  124. }
  125. // testBalancerUnderServerShutdownMutable expects that when the member of
  126. // the pinned endpoint is shut down, the balancer switches its endpoints
  127. // and all subsequent put/delete/txn requests succeed with new endpoints.
  128. func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Client, context.Context) error) {
  129. defer testutil.AfterTest(t)
  130. clus := integration.NewClusterV3(t, &integration.ClusterConfig{
  131. Size: 3,
  132. SkipCreatingClient: true,
  133. })
  134. defer clus.Terminate(t)
  135. eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
  136. // pin eps[0]
  137. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}})
  138. if err != nil {
  139. t.Fatal(err)
  140. }
  141. defer cli.Close()
  142. // wait for eps[0] to be pinned
  143. mustWaitPinReady(t, cli)
  144. // add all eps to list, so that when the original pined one fails
  145. // the client can switch to other available eps
  146. cli.SetEndpoints(eps...)
  147. // shut down eps[0]
  148. clus.Members[0].Terminate(t)
  149. // switched to others when eps[0] was explicitly shut down
  150. // and following request should succeed
  151. // TODO: remove this (expose client connection state?)
  152. time.Sleep(time.Second)
  153. cctx, ccancel := context.WithTimeout(context.Background(), time.Second)
  154. err = op(cli, cctx)
  155. ccancel()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. }
  160. func TestBalancerUnderServerShutdownGetLinearizable(t *testing.T) {
  161. testBalancerUnderServerShutdownImmutable(t, func(cli *clientv3.Client, ctx context.Context) error {
  162. _, err := cli.Get(ctx, "foo")
  163. return err
  164. }, 7*time.Second) // give enough time for leader election, balancer switch
  165. }
  166. func TestBalancerUnderServerShutdownGetSerializable(t *testing.T) {
  167. testBalancerUnderServerShutdownImmutable(t, func(cli *clientv3.Client, ctx context.Context) error {
  168. _, err := cli.Get(ctx, "foo", clientv3.WithSerializable())
  169. return err
  170. }, 2*time.Second)
  171. }
  172. // testBalancerUnderServerShutdownImmutable expects that when the member of
  173. // the pinned endpoint is shut down, the balancer switches its endpoints
  174. // and all subsequent range requests succeed with new endpoints.
  175. func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Client, context.Context) error, timeout time.Duration) {
  176. defer testutil.AfterTest(t)
  177. clus := integration.NewClusterV3(t, &integration.ClusterConfig{
  178. Size: 3,
  179. SkipCreatingClient: true,
  180. })
  181. defer clus.Terminate(t)
  182. eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
  183. // pin eps[0]
  184. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}})
  185. if err != nil {
  186. t.Errorf("failed to create client: %v", err)
  187. }
  188. defer cli.Close()
  189. // wait for eps[0] to be pinned
  190. mustWaitPinReady(t, cli)
  191. // add all eps to list, so that when the original pined one fails
  192. // the client can switch to other available eps
  193. cli.SetEndpoints(eps...)
  194. // shut down eps[0]
  195. clus.Members[0].Terminate(t)
  196. // switched to others when eps[0] was explicitly shut down
  197. // and following request should succeed
  198. cctx, ccancel := context.WithTimeout(context.Background(), timeout)
  199. err = op(cli, cctx)
  200. ccancel()
  201. if err != nil {
  202. t.Errorf("failed to finish range request in time %v (timeout %v)", err, timeout)
  203. }
  204. }
  205. func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) {
  206. tt := []pinTestOpt{
  207. {pinLeader: true, stopPinFirst: true},
  208. {pinLeader: true, stopPinFirst: false},
  209. {pinLeader: false, stopPinFirst: true},
  210. {pinLeader: false, stopPinFirst: false},
  211. }
  212. for i := range tt {
  213. testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i])
  214. }
  215. }
  216. func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) {
  217. tt := []pinTestOpt{
  218. {pinLeader: true, stopPinFirst: true},
  219. {pinLeader: true, stopPinFirst: false},
  220. {pinLeader: false, stopPinFirst: true},
  221. {pinLeader: false, stopPinFirst: false},
  222. }
  223. for i := range tt {
  224. testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i])
  225. }
  226. }
  227. type pinTestOpt struct {
  228. pinLeader bool
  229. stopPinFirst bool
  230. }
  231. // testBalancerUnderServerStopInflightRangeOnRestart expects
  232. // inflight range request reconnects on server restart.
  233. func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) {
  234. defer testutil.AfterTest(t)
  235. cfg := &integration.ClusterConfig{
  236. Size: 2,
  237. SkipCreatingClient: true,
  238. }
  239. if linearizable {
  240. cfg.Size = 3
  241. }
  242. clus := integration.NewClusterV3(t, cfg)
  243. defer clus.Terminate(t)
  244. eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
  245. if linearizable {
  246. eps = append(eps, clus.Members[2].GRPCAddr())
  247. }
  248. lead := clus.WaitLeader(t)
  249. target := lead
  250. if !opt.pinLeader {
  251. target = (target + 1) % 2
  252. }
  253. // pin eps[target]
  254. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
  255. if err != nil {
  256. t.Errorf("failed to create client: %v", err)
  257. }
  258. defer cli.Close()
  259. // wait for eps[target] to be pinned
  260. mustWaitPinReady(t, cli)
  261. // add all eps to list, so that when the original pined one fails
  262. // the client can switch to other available eps
  263. cli.SetEndpoints(eps...)
  264. if opt.stopPinFirst {
  265. clus.Members[target].Stop(t)
  266. // give some time for balancer switch before stopping the other
  267. time.Sleep(time.Second)
  268. clus.Members[(target+1)%2].Stop(t)
  269. } else {
  270. clus.Members[(target+1)%2].Stop(t)
  271. // balancer cannot pin other member since it's already stopped
  272. clus.Members[target].Stop(t)
  273. }
  274. // 3-second is the minimum interval between endpoint being marked
  275. // as unhealthy and being removed from unhealthy, so possibly
  276. // takes >5-second to unpin and repin an endpoint
  277. // TODO: decrease timeout when balancer switch rewrite
  278. clientTimeout := 7 * time.Second
  279. var gops []clientv3.OpOption
  280. if !linearizable {
  281. gops = append(gops, clientv3.WithSerializable())
  282. }
  283. donec, readyc := make(chan struct{}), make(chan struct{}, 1)
  284. go func() {
  285. defer close(donec)
  286. ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
  287. readyc <- struct{}{}
  288. _, err := cli.Get(ctx, "abc", gops...)
  289. cancel()
  290. if err != nil {
  291. if linearizable && isServerUnavailable(err) {
  292. t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err)
  293. } else {
  294. t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err)
  295. }
  296. }
  297. }()
  298. <-readyc
  299. clus.Members[target].Restart(t)
  300. select {
  301. case <-time.After(clientTimeout + integration.RequestWaitTimeout):
  302. t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
  303. case <-donec:
  304. }
  305. }
  306. // e.g. due to clock drifts in server-side,
  307. // client context times out first in server-side
  308. // while original client-side context is not timed out yet
  309. func isServerCtxTimeout(err error) bool {
  310. if err == nil {
  311. return false
  312. }
  313. ev, ok := status.FromError(err)
  314. if !ok {
  315. return false
  316. }
  317. code := ev.Code()
  318. return code == codes.DeadlineExceeded && strings.Contains(err.Error(), "context deadline exceeded")
  319. }
  320. // In grpc v1.11.3+ dial timeouts can error out with transport.ErrConnClosing. Previously dial timeouts
  321. // would always error out with context.DeadlineExceeded.
  322. func isClientTimeout(err error) bool {
  323. if err == nil {
  324. return false
  325. }
  326. if err == context.DeadlineExceeded {
  327. return true
  328. }
  329. ev, ok := status.FromError(err)
  330. if !ok {
  331. return false
  332. }
  333. code := ev.Code()
  334. return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc
  335. }
  336. func isServerUnavailable(err error) bool {
  337. if err == nil {
  338. return false
  339. }
  340. ev, ok := status.FromError(err)
  341. if !ok {
  342. return false
  343. }
  344. code := ev.Code()
  345. return code == codes.Unavailable
  346. }