watchable_store_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "os"
  17. "testing"
  18. )
  19. func TestWatch(t *testing.T) {
  20. s := newWatchableStore(tmpPath)
  21. defer func() {
  22. s.store.Close()
  23. os.Remove(tmpPath)
  24. }()
  25. testKey := []byte("foo")
  26. testValue := []byte("bar")
  27. s.Put(testKey, testValue)
  28. w := s.NewWatcher()
  29. w.Watch(testKey, true, 0)
  30. if _, ok := s.synced[string(testKey)]; !ok {
  31. // the key must have had an entry in synced
  32. t.Errorf("existence = %v, want true", ok)
  33. }
  34. }
  35. func TestNewWatcherCancel(t *testing.T) {
  36. s := newWatchableStore(tmpPath)
  37. defer func() {
  38. s.store.Close()
  39. os.Remove(tmpPath)
  40. }()
  41. testKey := []byte("foo")
  42. testValue := []byte("bar")
  43. s.Put(testKey, testValue)
  44. w := s.NewWatcher()
  45. _, cancel := w.Watch(testKey, true, 0)
  46. cancel()
  47. if _, ok := s.synced[string(testKey)]; ok {
  48. // the key shoud have been deleted
  49. t.Errorf("existence = %v, want false", ok)
  50. }
  51. }
  52. // TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced.
  53. func TestCancelUnsynced(t *testing.T) {
  54. // manually create watchableStore instead of newWatchableStore
  55. // because newWatchableStore automatically calls syncWatchers
  56. // method to sync watchers in unsynced map. We want to keep watchers
  57. // in unsynced to test if syncWatchers works as expected.
  58. s := &watchableStore{
  59. store: newStore(tmpPath),
  60. unsynced: make(map[*watching]struct{}),
  61. // to make the test not crash from assigning to nil map.
  62. // 'synced' doesn't get populated in this test.
  63. synced: make(map[string]map[*watching]struct{}),
  64. }
  65. defer func() {
  66. s.store.Close()
  67. os.Remove(tmpPath)
  68. }()
  69. // Put a key so that we can spawn watchers on that key.
  70. // (testKey in this test). This increases the rev to 1,
  71. // and later we can we set the watcher's startRev to 1,
  72. // and force watchers to be in unsynced.
  73. testKey := []byte("foo")
  74. testValue := []byte("bar")
  75. s.Put(testKey, testValue)
  76. w := s.NewWatcher()
  77. // arbitrary number for watchers
  78. watcherN := 100
  79. // create watcherN of CancelFunc of
  80. // synced and unsynced
  81. cancels := make([]CancelFunc, watcherN)
  82. for i := 0; i < watcherN; i++ {
  83. // use 1 to keep watchers in unsynced
  84. _, cancel := w.Watch(testKey, true, 1)
  85. cancels[i] = cancel
  86. }
  87. for idx := range cancels {
  88. cancels[idx]()
  89. }
  90. // After running CancelFunc
  91. //
  92. // unsynced should be empty
  93. // because cancel removes watching from unsynced
  94. if len(s.unsynced) != 0 {
  95. t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
  96. }
  97. }
  98. // TestSyncWatchings populates unsynced watching map and
  99. // tests syncWatchings method to see if it correctly sends
  100. // events to channel of unsynced watchings and moves these
  101. // watchings to synced.
  102. func TestSyncWatchings(t *testing.T) {
  103. s := &watchableStore{
  104. store: newStore(tmpPath),
  105. unsynced: make(map[*watching]struct{}),
  106. synced: make(map[string]map[*watching]struct{}),
  107. }
  108. defer func() {
  109. s.store.Close()
  110. os.Remove(tmpPath)
  111. }()
  112. testKey := []byte("foo")
  113. testValue := []byte("bar")
  114. s.Put(testKey, testValue)
  115. w := s.NewWatcher()
  116. // arbitrary number for watchers
  117. watcherN := 100
  118. for i := 0; i < watcherN; i++ {
  119. // use 1 to keep watchers in unsynced
  120. w.Watch(testKey, true, 1)
  121. }
  122. // Before running s.syncWatchings()
  123. //
  124. // synced should be empty
  125. // because we manually populate unsynced only
  126. if len(s.synced[string(testKey)]) != 0 {
  127. t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)]))
  128. }
  129. // unsynced should not be empty
  130. // because we manually populated unsynced only
  131. if len(s.unsynced) == 0 {
  132. t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
  133. }
  134. // this should move all unsynced watchings
  135. // to synced ones
  136. s.syncWatchings()
  137. // After running s.syncWatchings()
  138. //
  139. // synced should not be empty
  140. // because syncWatchings populates synced
  141. // in this test case
  142. if len(s.synced[string(testKey)]) == 0 {
  143. t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
  144. }
  145. // unsynced should be empty
  146. // because syncWatchings is expected to move
  147. // all watchings from unsynced to synced
  148. // in this test case
  149. if len(s.unsynced) != 0 {
  150. t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
  151. }
  152. // All of the watchings actually share one channel
  153. // so we only need to check one shared channel
  154. // (See watcher.go for more detail).
  155. if len(w.(*watcher).ch) != watcherN {
  156. t.Errorf("watched event size = %d, want %d", len(w.(*watcher).ch), watcherN)
  157. }
  158. }
  159. func TestUnsafeAddWatching(t *testing.T) {
  160. s := newWatchableStore(tmpPath)
  161. defer func() {
  162. s.store.Close()
  163. os.Remove(tmpPath)
  164. }()
  165. testKey := []byte("foo")
  166. testValue := []byte("bar")
  167. s.Put(testKey, testValue)
  168. size := 10
  169. ws := make([]*watching, size)
  170. for i := 0; i < size; i++ {
  171. ws[i] = &watching{
  172. key: testKey,
  173. prefix: true,
  174. cur: 0,
  175. }
  176. }
  177. // to test if unsafeAddWatching is correctly updating
  178. // synced map when adding new watching.
  179. for i, wa := range ws {
  180. if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil {
  181. t.Errorf("#%d: error = %v, want nil", i, err)
  182. }
  183. if v, ok := s.synced[string(testKey)]; !ok {
  184. t.Errorf("#%d: ok = %v, want ok true", i, ok)
  185. } else {
  186. if len(v) != i+1 {
  187. t.Errorf("#%d: len(v) = %d, want %d", i, len(v), i+1)
  188. }
  189. if _, ok := v[wa]; !ok {
  190. t.Errorf("#%d: ok = %v, want ok true", i, ok)
  191. }
  192. }
  193. }
  194. }