watcher_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "bytes"
  17. "os"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/lease"
  22. "github.com/coreos/etcd/storage/backend"
  23. )
  24. // TestWatcherWatchID tests that each watcher provides unique watchID,
  25. // and the watched event attaches the correct watchID.
  26. func TestWatcherWatchID(t *testing.T) {
  27. b, tmpPath := backend.NewDefaultTmpBackend()
  28. s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
  29. defer cleanup(s, b, tmpPath)
  30. w := s.NewWatchStream()
  31. defer w.Close()
  32. idm := make(map[WatchID]struct{})
  33. for i := 0; i < 10; i++ {
  34. id := w.Watch([]byte("foo"), nil, 0)
  35. if _, ok := idm[id]; ok {
  36. t.Errorf("#%d: id %d exists", i, id)
  37. }
  38. idm[id] = struct{}{}
  39. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  40. resp := <-w.Chan()
  41. if resp.WatchID != id {
  42. t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
  43. }
  44. if err := w.Cancel(id); err != nil {
  45. t.Error(err)
  46. }
  47. }
  48. s.Put([]byte("foo2"), []byte("bar"), lease.NoLease)
  49. // unsynced watchers
  50. for i := 10; i < 20; i++ {
  51. id := w.Watch([]byte("foo2"), nil, 1)
  52. if _, ok := idm[id]; ok {
  53. t.Errorf("#%d: id %d exists", i, id)
  54. }
  55. idm[id] = struct{}{}
  56. resp := <-w.Chan()
  57. if resp.WatchID != id {
  58. t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
  59. }
  60. if err := w.Cancel(id); err != nil {
  61. t.Error(err)
  62. }
  63. }
  64. }
  65. // TestWatcherWatchPrefix tests if Watch operation correctly watches
  66. // and returns events with matching prefixes.
  67. func TestWatcherWatchPrefix(t *testing.T) {
  68. b, tmpPath := backend.NewDefaultTmpBackend()
  69. s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
  70. defer cleanup(s, b, tmpPath)
  71. w := s.NewWatchStream()
  72. defer w.Close()
  73. idm := make(map[WatchID]struct{})
  74. val := []byte("bar")
  75. keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar")
  76. for i := 0; i < 10; i++ {
  77. id := w.Watch(keyWatch, keyEnd, 0)
  78. if _, ok := idm[id]; ok {
  79. t.Errorf("#%d: unexpected duplicated id %x", i, id)
  80. }
  81. idm[id] = struct{}{}
  82. s.Put(keyPut, val, lease.NoLease)
  83. resp := <-w.Chan()
  84. if resp.WatchID != id {
  85. t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
  86. }
  87. if err := w.Cancel(id); err != nil {
  88. t.Errorf("#%d: unexpected cancel error %v", i, err)
  89. }
  90. if len(resp.Events) != 1 {
  91. t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
  92. }
  93. if len(resp.Events) == 1 {
  94. if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
  95. t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
  96. }
  97. }
  98. }
  99. keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar")
  100. s.Put(keyPut1, val, lease.NoLease)
  101. // unsynced watchers
  102. for i := 10; i < 15; i++ {
  103. id := w.Watch(keyWatch1, keyEnd1, 1)
  104. if _, ok := idm[id]; ok {
  105. t.Errorf("#%d: id %d exists", i, id)
  106. }
  107. idm[id] = struct{}{}
  108. resp := <-w.Chan()
  109. if resp.WatchID != id {
  110. t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
  111. }
  112. if err := w.Cancel(id); err != nil {
  113. t.Error(err)
  114. }
  115. if len(resp.Events) != 1 {
  116. t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
  117. }
  118. if len(resp.Events) == 1 {
  119. if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
  120. t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
  121. }
  122. }
  123. }
  124. }
  125. // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
  126. // with given id inside watchStream.
  127. func TestWatchStreamCancelWatcherByID(t *testing.T) {
  128. b, tmpPath := backend.NewDefaultTmpBackend()
  129. s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
  130. defer cleanup(s, b, tmpPath)
  131. w := s.NewWatchStream()
  132. defer w.Close()
  133. id := w.Watch([]byte("foo"), nil, 0)
  134. tests := []struct {
  135. cancelID WatchID
  136. werr error
  137. }{
  138. // no error should be returned when cancel the created watcher.
  139. {id, nil},
  140. // not exist error should be returned when cancel again.
  141. {id, ErrWatcherNotExist},
  142. // not exist error should be returned when cancel a bad id.
  143. {id + 1, ErrWatcherNotExist},
  144. }
  145. for i, tt := range tests {
  146. gerr := w.Cancel(tt.cancelID)
  147. if gerr != tt.werr {
  148. t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
  149. }
  150. }
  151. if l := len(w.(*watchStream).cancels); l != 0 {
  152. t.Errorf("cancels = %d, want 0", l)
  153. }
  154. }
  155. // TestWatcherRequestProgress ensures synced watcher can correctly
  156. // report its correct progress.
  157. func TestWatcherRequestProgress(t *testing.T) {
  158. b, tmpPath := backend.NewDefaultTmpBackend()
  159. // manually create watchableStore instead of newWatchableStore
  160. // because newWatchableStore automatically calls syncWatchers
  161. // method to sync watchers in unsynced map. We want to keep watchers
  162. // in unsynced to test if syncWatchers works as expected.
  163. s := &watchableStore{
  164. store: NewStore(b, &lease.FakeLessor{}),
  165. unsynced: newWatcherGroup(),
  166. synced: newWatcherGroup(),
  167. }
  168. defer func() {
  169. s.store.Close()
  170. os.Remove(tmpPath)
  171. }()
  172. testKey := []byte("foo")
  173. notTestKey := []byte("bad")
  174. testValue := []byte("bar")
  175. s.Put(testKey, testValue, lease.NoLease)
  176. w := s.NewWatchStream()
  177. badID := WatchID(1000)
  178. w.RequestProgress(badID)
  179. select {
  180. case resp := <-w.Chan():
  181. t.Fatalf("unexpected %+v", resp)
  182. default:
  183. }
  184. id := w.Watch(notTestKey, nil, 1)
  185. w.RequestProgress(id)
  186. select {
  187. case resp := <-w.Chan():
  188. t.Fatalf("unexpected %+v", resp)
  189. default:
  190. }
  191. s.syncWatchers()
  192. w.RequestProgress(id)
  193. wrs := WatchResponse{WatchID: 0, Revision: 2}
  194. select {
  195. case resp := <-w.Chan():
  196. if !reflect.DeepEqual(resp, wrs) {
  197. t.Fatalf("got %+v, expect %+v", resp, wrs)
  198. }
  199. case <-time.After(time.Second):
  200. t.Fatal("failed to receive progress")
  201. }
  202. }