watchable_store.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "fmt"
  17. "log"
  18. "sync"
  19. "time"
  20. "github.com/coreos/etcd/storage/storagepb"
  21. )
  22. const (
  23. // chanBufLen is the length of the buffered chan
  24. // for sending out watched events.
  25. // TODO: find a good buf value. 1024 is just a random one that
  26. // seems to be reasonable.
  27. chanBufLen = 1024
  28. )
  29. type watchable interface {
  30. watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
  31. }
  32. type watchableStore struct {
  33. mu sync.Mutex
  34. *store
  35. // contains all unsynced watching that needs to sync events that have happened
  36. unsynced map[*watching]struct{}
  37. // contains all synced watching that are tracking the events that will happen
  38. // The key of the map is the key that the watching is watching on.
  39. synced map[string]map[*watching]struct{}
  40. tx *ongoingTx
  41. stopc chan struct{}
  42. wg sync.WaitGroup
  43. }
  44. func newWatchableStore(path string) *watchableStore {
  45. s := &watchableStore{
  46. store: newStore(path),
  47. unsynced: make(map[*watching]struct{}),
  48. synced: make(map[string]map[*watching]struct{}),
  49. stopc: make(chan struct{}),
  50. }
  51. s.wg.Add(1)
  52. go s.syncWatchingsLoop()
  53. return s
  54. }
  55. func (s *watchableStore) Put(key, value []byte) (rev int64) {
  56. s.mu.Lock()
  57. defer s.mu.Unlock()
  58. rev = s.store.Put(key, value)
  59. // TODO: avoid this range
  60. kvs, _, err := s.store.Range(key, nil, 0, rev)
  61. if err != nil {
  62. log.Panicf("unexpected range error (%v)", err)
  63. }
  64. s.handle(rev, storagepb.Event{
  65. Type: storagepb.PUT,
  66. Kv: &kvs[0],
  67. })
  68. return rev
  69. }
  70. func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
  71. s.mu.Lock()
  72. defer s.mu.Unlock()
  73. // TODO: avoid this range
  74. kvs, _, err := s.store.Range(key, end, 0, 0)
  75. if err != nil {
  76. log.Panicf("unexpected range error (%v)", err)
  77. }
  78. n, rev = s.store.DeleteRange(key, end)
  79. for _, kv := range kvs {
  80. s.handle(rev, storagepb.Event{
  81. Type: storagepb.DELETE,
  82. Kv: &storagepb.KeyValue{
  83. Key: kv.Key,
  84. },
  85. })
  86. }
  87. return n, rev
  88. }
  89. func (s *watchableStore) TxnBegin() int64 {
  90. s.mu.Lock()
  91. s.tx = newOngoingTx()
  92. return s.store.TxnBegin()
  93. }
  94. func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
  95. rev, err = s.store.TxnPut(txnID, key, value)
  96. if err == nil {
  97. s.tx.put(string(key))
  98. }
  99. return rev, err
  100. }
  101. func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
  102. kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0)
  103. if err != nil {
  104. log.Panicf("unexpected range error (%v)", err)
  105. }
  106. n, rev, err = s.store.TxnDeleteRange(txnID, key, end)
  107. if err == nil {
  108. for _, kv := range kvs {
  109. s.tx.del(string(kv.Key))
  110. }
  111. }
  112. return n, rev, err
  113. }
  114. func (s *watchableStore) TxnEnd(txnID int64) error {
  115. err := s.store.TxnEnd(txnID)
  116. if err != nil {
  117. return err
  118. }
  119. _, rev, _ := s.store.Range(nil, nil, 0, 0)
  120. for k := range s.tx.putm {
  121. kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
  122. if err != nil {
  123. log.Panicf("unexpected range error (%v)", err)
  124. }
  125. s.handle(rev, storagepb.Event{
  126. Type: storagepb.PUT,
  127. Kv: &kvs[0],
  128. })
  129. }
  130. for k := range s.tx.delm {
  131. s.handle(rev, storagepb.Event{
  132. Type: storagepb.DELETE,
  133. Kv: &storagepb.KeyValue{
  134. Key: []byte(k),
  135. },
  136. })
  137. }
  138. s.mu.Unlock()
  139. return nil
  140. }
  141. func (s *watchableStore) Close() error {
  142. close(s.stopc)
  143. s.wg.Wait()
  144. return s.store.Close()
  145. }
  146. func (s *watchableStore) NewWatcher() Watcher {
  147. watcherGauge.Inc()
  148. return &watcher{
  149. watchable: s,
  150. ch: make(chan storagepb.Event, chanBufLen),
  151. }
  152. }
  153. func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
  154. s.mu.Lock()
  155. defer s.mu.Unlock()
  156. wa := &watching{
  157. key: key,
  158. prefix: prefix,
  159. cur: startRev,
  160. id: id,
  161. ch: ch,
  162. }
  163. k := string(key)
  164. if startRev == 0 {
  165. if err := unsafeAddWatching(&s.synced, k, wa); err != nil {
  166. log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
  167. }
  168. } else {
  169. slowWatchingGauge.Inc()
  170. s.unsynced[wa] = struct{}{}
  171. }
  172. watchingGauge.Inc()
  173. cancel := CancelFunc(func() {
  174. s.mu.Lock()
  175. defer s.mu.Unlock()
  176. // remove global references of the watching
  177. if _, ok := s.unsynced[wa]; ok {
  178. delete(s.unsynced, wa)
  179. slowWatchingGauge.Dec()
  180. watchingGauge.Dec()
  181. return
  182. }
  183. if v, ok := s.synced[k]; ok {
  184. if _, ok := v[wa]; ok {
  185. delete(v, wa)
  186. // if there is nothing in s.synced[k],
  187. // remove the key from the synced
  188. if len(v) == 0 {
  189. delete(s.synced, k)
  190. }
  191. watchingGauge.Dec()
  192. }
  193. }
  194. // If we cannot find it, it should have finished watch.
  195. })
  196. return wa, cancel
  197. }
  198. // syncWatchingsLoop syncs the watching in the unsyncd map every 100ms.
  199. func (s *watchableStore) syncWatchingsLoop() {
  200. defer s.wg.Done()
  201. for {
  202. s.mu.Lock()
  203. s.syncWatchings()
  204. s.mu.Unlock()
  205. select {
  206. case <-time.After(100 * time.Millisecond):
  207. case <-s.stopc:
  208. return
  209. }
  210. }
  211. }
  212. // syncWatchings syncs the watchings in the unsyncd map.
  213. func (s *watchableStore) syncWatchings() {
  214. _, curRev, _ := s.store.Range(nil, nil, 0, 0)
  215. for w := range s.unsynced {
  216. var end []byte
  217. if w.prefix {
  218. end = make([]byte, len(w.key))
  219. copy(end, w.key)
  220. end[len(w.key)-1]++
  221. }
  222. limit := cap(w.ch) - len(w.ch)
  223. // the channel is full, try it in the next round
  224. if limit == 0 {
  225. continue
  226. }
  227. revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
  228. if err != nil {
  229. // TODO: send error event to watching
  230. delete(s.unsynced, w)
  231. continue
  232. }
  233. // push events to the channel
  234. for i, kv := range kvs {
  235. var evt storagepb.Event_EventType
  236. switch {
  237. case isTombstone(revbs[i]):
  238. evt = storagepb.DELETE
  239. default:
  240. evt = storagepb.PUT
  241. }
  242. w.ch <- storagepb.Event{
  243. Type: evt,
  244. Kv: &kv,
  245. WatchID: w.id,
  246. }
  247. pendingEventsGauge.Inc()
  248. }
  249. // switch to tracking future events if needed
  250. if nextRev > curRev {
  251. k := string(w.key)
  252. if err := unsafeAddWatching(&s.synced, k, w); err != nil {
  253. log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
  254. }
  255. delete(s.unsynced, w)
  256. continue
  257. }
  258. // put it back to try it in the next round
  259. w.cur = nextRev
  260. }
  261. slowWatchingGauge.Set(float64(len(s.unsynced)))
  262. }
  263. // handle handles the change of the happening event on all watchings.
  264. func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
  265. s.notify(rev, ev)
  266. }
  267. // notify notifies the fact that given event at the given rev just happened to
  268. // watchings that watch on the key of the event.
  269. func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
  270. // check all prefixes of the key to notify all corresponded watchings
  271. for i := 0; i <= len(ev.Kv.Key); i++ {
  272. k := string(ev.Kv.Key[:i])
  273. if wm, ok := s.synced[k]; ok {
  274. for w := range wm {
  275. // the watching needs to be notified when either it watches prefix or
  276. // the key is exactly matched.
  277. if !w.prefix && i != len(ev.Kv.Key) {
  278. continue
  279. }
  280. ev.WatchID = w.id
  281. select {
  282. case w.ch <- ev:
  283. pendingEventsGauge.Inc()
  284. default:
  285. w.cur = rev
  286. s.unsynced[w] = struct{}{}
  287. delete(wm, w)
  288. slowWatchingGauge.Inc()
  289. }
  290. }
  291. }
  292. }
  293. }
  294. type ongoingTx struct {
  295. // keys put/deleted in the ongoing txn
  296. putm map[string]struct{}
  297. delm map[string]struct{}
  298. }
  299. func newOngoingTx() *ongoingTx {
  300. return &ongoingTx{
  301. putm: make(map[string]struct{}),
  302. delm: make(map[string]struct{}),
  303. }
  304. }
  305. func (tx *ongoingTx) put(k string) {
  306. tx.putm[k] = struct{}{}
  307. if _, ok := tx.delm[k]; ok {
  308. delete(tx.delm, k)
  309. }
  310. }
  311. func (tx *ongoingTx) del(k string) {
  312. tx.delm[k] = struct{}{}
  313. if _, ok := tx.putm[k]; ok {
  314. delete(tx.putm, k)
  315. }
  316. }
  317. type watching struct {
  318. // the watching key
  319. key []byte
  320. // prefix indicates if watching is on a key or a prefix.
  321. // If prefix is true, the watching is on a prefix.
  322. prefix bool
  323. // cur is the current watching revision.
  324. // If cur is behind the current revision of the KV,
  325. // watching is unsynced and needs to catch up.
  326. cur int64
  327. id int64
  328. // a chan to send out the watched events.
  329. // The chan might be shared with other watchings.
  330. ch chan<- storagepb.Event
  331. }
  332. // unsafeAddWatching puts watching with key k into watchableStore's synced.
  333. // Make sure to this is thread-safe using mutex before and after.
  334. func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error {
  335. if wa == nil {
  336. return fmt.Errorf("nil watching received")
  337. }
  338. mp := *synced
  339. if v, ok := mp[k]; ok {
  340. if _, ok := v[wa]; ok {
  341. return fmt.Errorf("put the same watch twice: %+v", wa)
  342. } else {
  343. v[wa] = struct{}{}
  344. }
  345. return nil
  346. }
  347. mp[k] = make(map[*watching]struct{})
  348. mp[k][wa] = struct{}{}
  349. return nil
  350. }