watch_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "sort"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  24. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  25. "github.com/coreos/etcd/integration"
  26. mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
  27. "github.com/coreos/etcd/pkg/testutil"
  28. "golang.org/x/net/context"
  29. "google.golang.org/grpc"
  30. )
  31. type watcherTest func(*testing.T, *watchctx)
  32. type watchctx struct {
  33. clus *integration.ClusterV3
  34. w clientv3.Watcher
  35. wclient *clientv3.Client
  36. kv clientv3.KV
  37. wclientMember int
  38. kvMember int
  39. ch clientv3.WatchChan
  40. }
  41. func runWatchTest(t *testing.T, f watcherTest) {
  42. defer testutil.AfterTest(t)
  43. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  44. defer clus.Terminate(t)
  45. wclientMember := rand.Intn(3)
  46. wclient := clus.Client(wclientMember)
  47. w := clientv3.NewWatcher(wclient)
  48. defer w.Close()
  49. // select a different client from wclient so puts succeed if
  50. // a test knocks out the watcher client
  51. kvMember := rand.Intn(3)
  52. for kvMember == wclientMember {
  53. kvMember = rand.Intn(3)
  54. }
  55. kvclient := clus.Client(kvMember)
  56. kv := clientv3.NewKV(kvclient)
  57. wctx := &watchctx{clus, w, wclient, kv, wclientMember, kvMember, nil}
  58. f(t, wctx)
  59. }
  60. // TestWatchMultiWatcher modifies multiple keys and observes the changes.
  61. func TestWatchMultiWatcher(t *testing.T) {
  62. runWatchTest(t, testWatchMultiWatcher)
  63. }
  64. func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
  65. numKeyUpdates := 4
  66. keys := []string{"foo", "bar", "baz"}
  67. donec := make(chan struct{})
  68. readyc := make(chan struct{})
  69. for _, k := range keys {
  70. // key watcher
  71. go func(key string) {
  72. ch := wctx.w.Watch(context.TODO(), key)
  73. if ch == nil {
  74. t.Fatalf("expected watcher channel, got nil")
  75. }
  76. readyc <- struct{}{}
  77. for i := 0; i < numKeyUpdates; i++ {
  78. resp, ok := <-ch
  79. if !ok {
  80. t.Fatalf("watcher unexpectedly closed")
  81. }
  82. v := fmt.Sprintf("%s-%d", key, i)
  83. gotv := string(resp.Events[0].Kv.Value)
  84. if gotv != v {
  85. t.Errorf("#%d: got %s, wanted %s", i, gotv, v)
  86. }
  87. }
  88. donec <- struct{}{}
  89. }(k)
  90. }
  91. // prefix watcher on "b" (bar and baz)
  92. go func() {
  93. prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
  94. if prefixc == nil {
  95. t.Fatalf("expected watcher channel, got nil")
  96. }
  97. readyc <- struct{}{}
  98. evs := []*clientv3.Event{}
  99. for i := 0; i < numKeyUpdates*2; i++ {
  100. resp, ok := <-prefixc
  101. if !ok {
  102. t.Fatalf("watcher unexpectedly closed")
  103. }
  104. evs = append(evs, resp.Events...)
  105. }
  106. // check response
  107. expected := []string{}
  108. bkeys := []string{"bar", "baz"}
  109. for _, k := range bkeys {
  110. for i := 0; i < numKeyUpdates; i++ {
  111. expected = append(expected, fmt.Sprintf("%s-%d", k, i))
  112. }
  113. }
  114. got := []string{}
  115. for _, ev := range evs {
  116. got = append(got, string(ev.Kv.Value))
  117. }
  118. sort.Strings(got)
  119. if !reflect.DeepEqual(expected, got) {
  120. t.Errorf("got %v, expected %v", got, expected)
  121. }
  122. // ensure no extra data
  123. select {
  124. case resp, ok := <-prefixc:
  125. if !ok {
  126. t.Fatalf("watcher unexpectedly closed")
  127. }
  128. t.Fatalf("unexpected event %+v", resp)
  129. case <-time.After(time.Second):
  130. }
  131. donec <- struct{}{}
  132. }()
  133. // wait for watcher bring up
  134. for i := 0; i < len(keys)+1; i++ {
  135. <-readyc
  136. }
  137. // generate events
  138. ctx := context.TODO()
  139. for i := 0; i < numKeyUpdates; i++ {
  140. for _, k := range keys {
  141. v := fmt.Sprintf("%s-%d", k, i)
  142. if _, err := wctx.kv.Put(ctx, k, v); err != nil {
  143. t.Fatal(err)
  144. }
  145. }
  146. }
  147. // wait for watcher shutdown
  148. for i := 0; i < len(keys)+1; i++ {
  149. <-donec
  150. }
  151. }
  152. // TestWatchRange tests watcher creates ranges
  153. func TestWatchRange(t *testing.T) {
  154. runWatchTest(t, testWatchRange)
  155. }
  156. func testWatchRange(t *testing.T, wctx *watchctx) {
  157. if wctx.ch = wctx.w.Watch(context.TODO(), "a", clientv3.WithRange("c")); wctx.ch == nil {
  158. t.Fatalf("expected non-nil channel")
  159. }
  160. putAndWatch(t, wctx, "a", "a")
  161. putAndWatch(t, wctx, "b", "b")
  162. putAndWatch(t, wctx, "bar", "bar")
  163. }
  164. // TestWatchReconnRequest tests the send failure path when requesting a watcher.
  165. func TestWatchReconnRequest(t *testing.T) {
  166. runWatchTest(t, testWatchReconnRequest)
  167. }
  168. func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
  169. donec, stopc := make(chan struct{}), make(chan struct{}, 1)
  170. go func() {
  171. timer := time.After(2 * time.Second)
  172. defer close(donec)
  173. // take down watcher connection
  174. for {
  175. wctx.clus.Members[wctx.wclientMember].DropConnections()
  176. select {
  177. case <-timer:
  178. // spinning on close may live lock reconnection
  179. return
  180. case <-stopc:
  181. return
  182. default:
  183. }
  184. }
  185. }()
  186. // should reconnect when requesting watch
  187. if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
  188. t.Fatalf("expected non-nil channel")
  189. }
  190. // wait for disconnections to stop
  191. stopc <- struct{}{}
  192. <-donec
  193. // spinning on dropping connections may trigger a leader election
  194. // due to resource starvation; l-read to ensure the cluster is stable
  195. ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
  196. if _, err := wctx.kv.Get(ctx, "_"); err != nil {
  197. t.Fatal(err)
  198. }
  199. cancel()
  200. // ensure watcher works
  201. putAndWatch(t, wctx, "a", "a")
  202. }
  203. // TestWatchReconnInit tests watcher resumes correctly if connection lost
  204. // before any data was sent.
  205. func TestWatchReconnInit(t *testing.T) {
  206. runWatchTest(t, testWatchReconnInit)
  207. }
  208. func testWatchReconnInit(t *testing.T, wctx *watchctx) {
  209. if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
  210. t.Fatalf("expected non-nil channel")
  211. }
  212. wctx.clus.Members[wctx.wclientMember].DropConnections()
  213. // watcher should recover
  214. putAndWatch(t, wctx, "a", "a")
  215. }
  216. // TestWatchReconnRunning tests watcher resumes correctly if connection lost
  217. // after data was sent.
  218. func TestWatchReconnRunning(t *testing.T) {
  219. runWatchTest(t, testWatchReconnRunning)
  220. }
  221. func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
  222. if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
  223. t.Fatalf("expected non-nil channel")
  224. }
  225. putAndWatch(t, wctx, "a", "a")
  226. // take down watcher connection
  227. wctx.clus.Members[wctx.wclientMember].DropConnections()
  228. // watcher should recover
  229. putAndWatch(t, wctx, "a", "b")
  230. }
  231. // TestWatchCancelImmediate ensures a closed channel is returned
  232. // if the context is cancelled.
  233. func TestWatchCancelImmediate(t *testing.T) {
  234. runWatchTest(t, testWatchCancelImmediate)
  235. }
  236. func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
  237. ctx, cancel := context.WithCancel(context.Background())
  238. cancel()
  239. wch := wctx.w.Watch(ctx, "a")
  240. select {
  241. case wresp, ok := <-wch:
  242. if ok {
  243. t.Fatalf("read wch got %v; expected closed channel", wresp)
  244. }
  245. default:
  246. t.Fatalf("closed watcher channel should not block")
  247. }
  248. }
  249. // TestWatchCancelInit tests watcher closes correctly after no events.
  250. func TestWatchCancelInit(t *testing.T) {
  251. runWatchTest(t, testWatchCancelInit)
  252. }
  253. func testWatchCancelInit(t *testing.T, wctx *watchctx) {
  254. ctx, cancel := context.WithCancel(context.Background())
  255. if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
  256. t.Fatalf("expected non-nil watcher channel")
  257. }
  258. cancel()
  259. select {
  260. case <-time.After(time.Second):
  261. t.Fatalf("took too long to cancel")
  262. case _, ok := <-wctx.ch:
  263. if ok {
  264. t.Fatalf("expected watcher channel to close")
  265. }
  266. }
  267. }
  268. // TestWatchCancelRunning tests watcher closes correctly after events.
  269. func TestWatchCancelRunning(t *testing.T) {
  270. runWatchTest(t, testWatchCancelRunning)
  271. }
  272. func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
  273. ctx, cancel := context.WithCancel(context.Background())
  274. if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
  275. t.Fatalf("expected non-nil watcher channel")
  276. }
  277. if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {
  278. t.Fatal(err)
  279. }
  280. cancel()
  281. select {
  282. case <-time.After(time.Second):
  283. t.Fatalf("took too long to cancel")
  284. case v, ok := <-wctx.ch:
  285. if !ok {
  286. // closed before getting put; OK
  287. break
  288. }
  289. // got the PUT; should close next
  290. select {
  291. case <-time.After(time.Second):
  292. t.Fatalf("took too long to close")
  293. case v, ok = <-wctx.ch:
  294. if ok {
  295. t.Fatalf("expected watcher channel to close, got %v", v)
  296. }
  297. }
  298. }
  299. }
  300. func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
  301. if _, err := wctx.kv.Put(context.TODO(), key, val); err != nil {
  302. t.Fatal(err)
  303. }
  304. select {
  305. case <-time.After(5 * time.Second):
  306. t.Fatalf("watch timed out")
  307. case v, ok := <-wctx.ch:
  308. if !ok {
  309. t.Fatalf("unexpected watch close")
  310. }
  311. if string(v.Events[0].Kv.Value) != val {
  312. t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
  313. }
  314. }
  315. }
  316. // TestWatchResumeComapcted checks that the watcher gracefully closes in case
  317. // that it tries to resume to a revision that's been compacted out of the store.
  318. // Since the watcher's server restarts with stale data, the watcher will receive
  319. // either a compaction error or all keys by staying in sync before the compaction
  320. // is finally applied.
  321. func TestWatchResumeCompacted(t *testing.T) {
  322. defer testutil.AfterTest(t)
  323. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  324. defer clus.Terminate(t)
  325. // create a waiting watcher at rev 1
  326. w := clientv3.NewWatcher(clus.Client(0))
  327. defer w.Close()
  328. wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1))
  329. select {
  330. case w := <-wch:
  331. t.Errorf("unexpected message from wch %v", w)
  332. default:
  333. }
  334. clus.Members[0].Stop(t)
  335. ticker := time.After(time.Second * 10)
  336. for clus.WaitLeader(t) <= 0 {
  337. select {
  338. case <-ticker:
  339. t.Fatalf("failed to wait for new leader")
  340. default:
  341. time.Sleep(10 * time.Millisecond)
  342. }
  343. }
  344. // put some data and compact away
  345. numPuts := 5
  346. kv := clientv3.NewKV(clus.Client(1))
  347. for i := 0; i < numPuts; i++ {
  348. if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
  349. t.Fatal(err)
  350. }
  351. }
  352. if _, err := kv.Compact(context.TODO(), 3); err != nil {
  353. t.Fatal(err)
  354. }
  355. clus.Members[0].Restart(t)
  356. // since watch's server isn't guaranteed to be synced with the cluster when
  357. // the watch resumes, there is a window where the watch can stay synced and
  358. // read off all events; if the watcher misses the window, it will go out of
  359. // sync and get a compaction error.
  360. wRev := int64(2)
  361. for int(wRev) <= numPuts+1 {
  362. var wresp clientv3.WatchResponse
  363. var ok bool
  364. select {
  365. case wresp, ok = <-wch:
  366. if !ok {
  367. t.Fatalf("expected wresp, but got closed channel")
  368. }
  369. case <-time.After(5 * time.Second):
  370. t.Fatalf("compacted watch timed out")
  371. }
  372. for _, ev := range wresp.Events {
  373. if ev.Kv.ModRevision != wRev {
  374. t.Fatalf("expected modRev %v, got %+v", wRev, ev)
  375. }
  376. wRev++
  377. }
  378. if wresp.Err() == nil {
  379. continue
  380. }
  381. if wresp.Err() != rpctypes.ErrCompacted {
  382. t.Fatalf("wresp.Err() expected %v, but got %v %+v", rpctypes.ErrCompacted, wresp.Err())
  383. }
  384. break
  385. }
  386. if int(wRev) > numPuts+1 {
  387. // got data faster than the compaction
  388. return
  389. }
  390. // received compaction error; ensure the channel closes
  391. select {
  392. case wresp, ok := <-wch:
  393. if ok {
  394. t.Fatalf("expected closed channel, but got %v", wresp)
  395. }
  396. case <-time.After(5 * time.Second):
  397. t.Fatalf("timed out waiting for channel close")
  398. }
  399. }
  400. // TestWatchCompactRevision ensures the CompactRevision error is given on a
  401. // compaction event ahead of a watcher.
  402. func TestWatchCompactRevision(t *testing.T) {
  403. defer testutil.AfterTest(t)
  404. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  405. defer clus.Terminate(t)
  406. // set some keys
  407. kv := clientv3.NewKV(clus.RandClient())
  408. for i := 0; i < 5; i++ {
  409. if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
  410. t.Fatal(err)
  411. }
  412. }
  413. w := clientv3.NewWatcher(clus.RandClient())
  414. defer w.Close()
  415. if _, err := kv.Compact(context.TODO(), 4); err != nil {
  416. t.Fatal(err)
  417. }
  418. wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
  419. // get compacted error message
  420. wresp, ok := <-wch
  421. if !ok {
  422. t.Fatalf("expected wresp, but got closed channel")
  423. }
  424. if wresp.Err() != rpctypes.ErrCompacted {
  425. t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
  426. }
  427. // ensure the channel is closed
  428. if wresp, ok = <-wch; ok {
  429. t.Fatalf("expected closed channel, but got %v", wresp)
  430. }
  431. }
  432. func TestWatchWithProgressNotify(t *testing.T) { testWatchWithProgressNotify(t, true) }
  433. func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
  434. func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
  435. defer testutil.AfterTest(t)
  436. // accelerate report interval so test terminates quickly
  437. oldpi := v3rpc.GetProgressReportInterval()
  438. // using atomics to avoid race warnings
  439. v3rpc.SetProgressReportInterval(3 * time.Second)
  440. pi := 3 * time.Second
  441. defer func() { v3rpc.SetProgressReportInterval(oldpi) }()
  442. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  443. defer clus.Terminate(t)
  444. wc := clientv3.NewWatcher(clus.RandClient())
  445. defer wc.Close()
  446. opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
  447. if watchOnPut {
  448. opts = append(opts, clientv3.WithPrefix())
  449. }
  450. rch := wc.Watch(context.Background(), "foo", opts...)
  451. select {
  452. case resp := <-rch: // wait for notification
  453. if len(resp.Events) != 0 {
  454. t.Fatalf("resp.Events expected none, got %+v", resp.Events)
  455. }
  456. case <-time.After(2 * pi):
  457. t.Fatalf("watch response expected in %v, but timed out", pi)
  458. }
  459. kvc := clientv3.NewKV(clus.RandClient())
  460. if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
  461. t.Fatal(err)
  462. }
  463. select {
  464. case resp := <-rch:
  465. if resp.Header.Revision != 2 {
  466. t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
  467. }
  468. if watchOnPut { // wait for put if watch on the put key
  469. ev := []*clientv3.Event{{Type: clientv3.EventTypePut,
  470. Kv: &mvccpb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
  471. if !reflect.DeepEqual(ev, resp.Events) {
  472. t.Fatalf("expected %+v, got %+v", ev, resp.Events)
  473. }
  474. } else if len(resp.Events) != 0 { // wait for notification otherwise
  475. t.Fatalf("expected no events, but got %+v", resp.Events)
  476. }
  477. case <-time.After(time.Duration(1.5 * float64(pi))):
  478. t.Fatalf("watch response expected in %v, but timed out", pi)
  479. }
  480. }
  481. func TestWatchEventType(t *testing.T) {
  482. cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  483. defer cluster.Terminate(t)
  484. client := cluster.RandClient()
  485. ctx := context.Background()
  486. watchChan := client.Watch(ctx, "/", clientv3.WithPrefix())
  487. if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil {
  488. t.Fatalf("Put failed: %v", err)
  489. }
  490. if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil {
  491. t.Fatalf("Put failed: %v", err)
  492. }
  493. if _, err := client.Delete(ctx, "/toDelete"); err != nil {
  494. t.Fatalf("Delete failed: %v", err)
  495. }
  496. lcr, err := client.Lease.Grant(ctx, 1)
  497. if err != nil {
  498. t.Fatalf("lease create failed: %v", err)
  499. }
  500. if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(lcr.ID)); err != nil {
  501. t.Fatalf("Put failed: %v", err)
  502. }
  503. tests := []struct {
  504. et mvccpb.Event_EventType
  505. isCreate bool
  506. isModify bool
  507. }{{
  508. et: clientv3.EventTypePut,
  509. isCreate: true,
  510. }, {
  511. et: clientv3.EventTypePut,
  512. isModify: true,
  513. }, {
  514. et: clientv3.EventTypeDelete,
  515. }, {
  516. et: clientv3.EventTypePut,
  517. isCreate: true,
  518. }, {
  519. et: clientv3.EventTypeDelete,
  520. }}
  521. var res []*clientv3.Event
  522. for {
  523. select {
  524. case wres := <-watchChan:
  525. res = append(res, wres.Events...)
  526. case <-time.After(10 * time.Second):
  527. t.Fatalf("Should receive %d events and then break out loop", len(tests))
  528. }
  529. if len(res) == len(tests) {
  530. break
  531. }
  532. }
  533. for i, tt := range tests {
  534. ev := res[i]
  535. if tt.et != ev.Type {
  536. t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type)
  537. }
  538. if tt.isCreate && !ev.IsCreate() {
  539. t.Errorf("#%d: event should be CreateEvent", i)
  540. }
  541. if tt.isModify && !ev.IsModify() {
  542. t.Errorf("#%d: event should be ModifyEvent", i)
  543. }
  544. }
  545. }
  546. func TestWatchErrConnClosed(t *testing.T) {
  547. defer testutil.AfterTest(t)
  548. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  549. defer clus.Terminate(t)
  550. cli := clus.Client(0)
  551. wc := clientv3.NewWatcher(cli)
  552. donec := make(chan struct{})
  553. go func() {
  554. defer close(donec)
  555. wc.Watch(context.TODO(), "foo")
  556. if err := wc.Close(); err != nil && err != grpc.ErrClientConnClosing {
  557. t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
  558. }
  559. }()
  560. if err := cli.Close(); err != nil {
  561. t.Fatal(err)
  562. }
  563. clus.TakeClient(0)
  564. select {
  565. case <-time.After(3 * time.Second):
  566. t.Fatal("wc.Watch took too long")
  567. case <-donec:
  568. }
  569. }
  570. func TestWatchAfterClose(t *testing.T) {
  571. defer testutil.AfterTest(t)
  572. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  573. defer clus.Terminate(t)
  574. cli := clus.Client(0)
  575. clus.TakeClient(0)
  576. if err := cli.Close(); err != nil {
  577. t.Fatal(err)
  578. }
  579. donec := make(chan struct{})
  580. go func() {
  581. wc := clientv3.NewWatcher(cli)
  582. wc.Watch(context.TODO(), "foo")
  583. if err := wc.Close(); err != nil && err != grpc.ErrClientConnClosing {
  584. t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
  585. }
  586. close(donec)
  587. }()
  588. select {
  589. case <-time.After(3 * time.Second):
  590. t.Fatal("wc.Watch took too long")
  591. case <-donec:
  592. }
  593. }
  594. // TestWatchWithRequireLeader checks the watch channel closes when no leader.
  595. func TestWatchWithRequireLeader(t *testing.T) {
  596. defer testutil.AfterTest(t)
  597. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  598. defer clus.Terminate(t)
  599. // something for the non-require leader watch to read as an event
  600. if _, err := clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
  601. t.Fatal(err)
  602. }
  603. clus.Members[1].Stop(t)
  604. clus.Members[2].Stop(t)
  605. clus.Client(1).Close()
  606. clus.Client(2).Close()
  607. clus.TakeClient(1)
  608. clus.TakeClient(2)
  609. // wait for election timeout, then member[0] will not have a leader.
  610. tickDuration := 10 * time.Millisecond
  611. time.Sleep(time.Duration(3*clus.Members[0].ElectionTicks) * tickDuration)
  612. chLeader := clus.Client(0).Watch(clientv3.WithRequireLeader(context.TODO()), "foo", clientv3.WithRev(1))
  613. chNoLeader := clus.Client(0).Watch(context.TODO(), "foo", clientv3.WithRev(1))
  614. select {
  615. case resp, ok := <-chLeader:
  616. if !ok {
  617. t.Fatalf("expected %v watch channel, got closed channel", rpctypes.ErrNoLeader)
  618. }
  619. if resp.Err() != rpctypes.ErrNoLeader {
  620. t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
  621. }
  622. case <-time.After(3 * time.Second):
  623. t.Fatal("watch without leader took too long to close")
  624. }
  625. select {
  626. case resp, ok := <-chLeader:
  627. if ok {
  628. t.Fatalf("expected closed channel, got response %v", resp)
  629. }
  630. case <-time.After(3 * time.Second):
  631. t.Fatal("waited too long for channel to close")
  632. }
  633. if _, ok := <-chNoLeader; !ok {
  634. t.Fatalf("expected response, got closed channel")
  635. }
  636. }
  637. // TestWatchWithFilter checks that watch filtering works.
  638. func TestWatchWithFilter(t *testing.T) {
  639. cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  640. defer cluster.Terminate(t)
  641. client := cluster.RandClient()
  642. ctx := context.Background()
  643. wcNoPut := client.Watch(ctx, "a", clientv3.WithFilterPut())
  644. wcNoDel := client.Watch(ctx, "a", clientv3.WithFilterDelete())
  645. if _, err := client.Put(ctx, "a", "abc"); err != nil {
  646. t.Fatal(err)
  647. }
  648. if _, err := client.Delete(ctx, "a"); err != nil {
  649. t.Fatal(err)
  650. }
  651. npResp := <-wcNoPut
  652. if len(npResp.Events) != 1 || npResp.Events[0].Type != clientv3.EventTypeDelete {
  653. t.Fatalf("expected delete event, got %+v", npResp.Events)
  654. }
  655. ndResp := <-wcNoDel
  656. if len(ndResp.Events) != 1 || ndResp.Events[0].Type != clientv3.EventTypePut {
  657. t.Fatalf("expected put event, got %+v", ndResp.Events)
  658. }
  659. select {
  660. case resp := <-wcNoPut:
  661. t.Fatalf("unexpected event on filtered put (%+v)", resp)
  662. case resp := <-wcNoDel:
  663. t.Fatalf("unexpected event on filtered delete (%+v)", resp)
  664. case <-time.After(100 * time.Millisecond):
  665. }
  666. }
  667. // TestWatchWithCreatedNotification checks that createdNotification works.
  668. func TestWatchWithCreatedNotification(t *testing.T) {
  669. cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  670. defer cluster.Terminate(t)
  671. client := cluster.RandClient()
  672. ctx := context.Background()
  673. createC := client.Watch(ctx, "a", clientv3.WithCreatedNotify())
  674. resp := <-createC
  675. if !resp.Created {
  676. t.Fatalf("expected created event, got %v", resp)
  677. }
  678. }
  679. // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
  680. func TestWatchCancelOnServer(t *testing.T) {
  681. cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  682. defer cluster.Terminate(t)
  683. client := cluster.RandClient()
  684. for i := 0; i < 10; i++ {
  685. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  686. client.Watch(ctx, "a", clientv3.WithCreatedNotify())
  687. cancel()
  688. }
  689. // wait for cancels to propagate
  690. time.Sleep(time.Second)
  691. watchers, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
  692. if err != nil {
  693. t.Fatal(err)
  694. }
  695. if watchers != "0" {
  696. t.Fatalf("expected 0 watchers, got %q", watchers)
  697. }
  698. }
  699. // TestWatchOverlapContextCancel stresses the watcher stream teardown path by
  700. // creating/canceling watchers to ensure that new watchers are not taken down
  701. // by a torn down watch stream. The sort of race that's being detected:
  702. // 1. create w1 using a cancelable ctx with %v as "ctx"
  703. // 2. cancel ctx
  704. // 3. watcher client begins tearing down watcher grpc stream since no more watchers
  705. // 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
  706. // 4. watcher client finishes tearing down stream on "ctx"
  707. // 5. w2 comes back canceled
  708. func TestWatchOverlapContextCancel(t *testing.T) {
  709. f := func(clus *integration.ClusterV3) {}
  710. testWatchOverlapContextCancel(t, f)
  711. }
  712. func TestWatchOverlapDropConnContextCancel(t *testing.T) {
  713. f := func(clus *integration.ClusterV3) {
  714. clus.Members[0].DropConnections()
  715. }
  716. testWatchOverlapContextCancel(t, f)
  717. }
  718. func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
  719. defer testutil.AfterTest(t)
  720. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  721. defer clus.Terminate(t)
  722. // each unique context "%v" has a unique grpc stream
  723. n := 100
  724. ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
  725. for i := range ctxs {
  726. // make "%v" unique
  727. ctxs[i] = context.WithValue(context.TODO(), "key", i)
  728. // limits the maximum number of outstanding watchers per stream
  729. ctxc[i] = make(chan struct{}, 2)
  730. }
  731. // issue concurrent watches on "abc" with cancel
  732. cli := clus.RandClient()
  733. if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
  734. t.Fatal(err)
  735. }
  736. ch := make(chan struct{}, n)
  737. for i := 0; i < n; i++ {
  738. go func() {
  739. defer func() { ch <- struct{}{} }()
  740. idx := rand.Intn(len(ctxs))
  741. ctx, cancel := context.WithCancel(ctxs[idx])
  742. ctxc[idx] <- struct{}{}
  743. wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
  744. f(clus)
  745. select {
  746. case _, ok := <-wch:
  747. if !ok {
  748. t.Fatalf("unexpected closed channel %p", wch)
  749. }
  750. // may take a second or two to reestablish a watcher because of
  751. // grpc backoff policies for disconnects
  752. case <-time.After(5 * time.Second):
  753. t.Errorf("timed out waiting for watch on %p", wch)
  754. }
  755. // randomize how cancel overlaps with watch creation
  756. if rand.Intn(2) == 0 {
  757. <-ctxc[idx]
  758. cancel()
  759. } else {
  760. cancel()
  761. <-ctxc[idx]
  762. }
  763. }()
  764. }
  765. // join on watches
  766. for i := 0; i < n; i++ {
  767. select {
  768. case <-ch:
  769. case <-time.After(5 * time.Second):
  770. t.Fatalf("timed out waiting for completed watch")
  771. }
  772. }
  773. }
  774. // TestWatchCanelAndCloseClient ensures that canceling a watcher then immediately
  775. // closing the client does not return a client closing error.
  776. func TestWatchCancelAndCloseClient(t *testing.T) {
  777. defer testutil.AfterTest(t)
  778. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  779. defer clus.Terminate(t)
  780. cli := clus.Client(0)
  781. ctx, cancel := context.WithCancel(context.Background())
  782. wch := cli.Watch(ctx, "abc")
  783. donec := make(chan struct{})
  784. go func() {
  785. defer close(donec)
  786. select {
  787. case wr, ok := <-wch:
  788. if ok {
  789. t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
  790. }
  791. case <-time.After(5 * time.Second):
  792. t.Fatal("timed out waiting for closed channel")
  793. }
  794. }()
  795. cancel()
  796. if err := cli.Close(); err != nil {
  797. t.Fatal(err)
  798. }
  799. <-donec
  800. clus.TakeClient(0)
  801. }
  802. // TestWatchStressResumeClose establishes a bunch of watchers, disconnects
  803. // to put them in resuming mode, cancels them so some resumes by cancel fail,
  804. // then closes the watcher interface to ensure correct clean up.
  805. func TestWatchStressResumeClose(t *testing.T) {
  806. defer testutil.AfterTest(t)
  807. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  808. defer clus.Terminate(t)
  809. cli := clus.Client(0)
  810. ctx, cancel := context.WithCancel(context.Background())
  811. // add more watches than can be resumed before the cancel
  812. wchs := make([]clientv3.WatchChan, 2000)
  813. for i := range wchs {
  814. wchs[i] = cli.Watch(ctx, "abc")
  815. }
  816. clus.Members[0].DropConnections()
  817. cancel()
  818. if err := cli.Close(); err != nil {
  819. t.Fatal(err)
  820. }
  821. clus.TakeClient(0)
  822. }