watchable_store.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "fmt"
  17. "log"
  18. "math"
  19. "sync"
  20. "time"
  21. "github.com/coreos/etcd/lease"
  22. "github.com/coreos/etcd/storage/storagepb"
  23. )
  24. const (
  25. // chanBufLen is the length of the buffered chan
  26. // for sending out watched events.
  27. // TODO: find a good buf value. 1024 is just a random one that
  28. // seems to be reasonable.
  29. chanBufLen = 1024
  30. )
  31. type watchable interface {
  32. watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
  33. }
  34. type watchableStore struct {
  35. mu sync.Mutex
  36. *store
  37. // contains all unsynced watchers that needs to sync with events that have happened
  38. unsynced map[*watcher]struct{}
  39. // contains all synced watchers that are in sync with the progress of the store.
  40. // The key of the map is the key that the watcher watches on.
  41. synced map[string]map[*watcher]struct{}
  42. tx *ongoingTx
  43. stopc chan struct{}
  44. wg sync.WaitGroup
  45. }
  46. // cancelFunc updates unsynced and synced maps when running
  47. // cancel operations.
  48. type cancelFunc func()
  49. func newWatchableStore(path string) *watchableStore {
  50. s := &watchableStore{
  51. store: newDefaultStore(path),
  52. unsynced: make(map[*watcher]struct{}),
  53. synced: make(map[string]map[*watcher]struct{}),
  54. stopc: make(chan struct{}),
  55. }
  56. s.wg.Add(1)
  57. go s.syncWatchersLoop()
  58. return s
  59. }
  60. func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
  61. s.mu.Lock()
  62. defer s.mu.Unlock()
  63. rev = s.store.Put(key, value, lease)
  64. // TODO: avoid this range
  65. kvs, _, err := s.store.Range(key, nil, 0, rev)
  66. if err != nil {
  67. log.Panicf("unexpected range error (%v)", err)
  68. }
  69. ev := storagepb.Event{
  70. Type: storagepb.PUT,
  71. Kv: &kvs[0],
  72. }
  73. s.handle(rev, []storagepb.Event{ev})
  74. return rev
  75. }
  76. func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
  77. s.mu.Lock()
  78. defer s.mu.Unlock()
  79. // TODO: avoid this range
  80. kvs, _, err := s.store.Range(key, end, 0, 0)
  81. if err != nil {
  82. log.Panicf("unexpected range error (%v)", err)
  83. }
  84. n, rev = s.store.DeleteRange(key, end)
  85. evs := make([]storagepb.Event, len(kvs))
  86. for i, kv := range kvs {
  87. evs[i] = storagepb.Event{
  88. Type: storagepb.DELETE,
  89. Kv: &storagepb.KeyValue{
  90. Key: kv.Key,
  91. }}
  92. }
  93. s.handle(rev, evs)
  94. return n, rev
  95. }
  96. func (s *watchableStore) TxnBegin() int64 {
  97. s.mu.Lock()
  98. s.tx = newOngoingTx()
  99. return s.store.TxnBegin()
  100. }
  101. func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
  102. rev, err = s.store.TxnPut(txnID, key, value, lease)
  103. if err == nil {
  104. s.tx.put(string(key))
  105. }
  106. return rev, err
  107. }
  108. func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
  109. kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0)
  110. if err != nil {
  111. log.Panicf("unexpected range error (%v)", err)
  112. }
  113. n, rev, err = s.store.TxnDeleteRange(txnID, key, end)
  114. if err == nil {
  115. for _, kv := range kvs {
  116. s.tx.del(string(kv.Key))
  117. }
  118. }
  119. return n, rev, err
  120. }
  121. func (s *watchableStore) TxnEnd(txnID int64) error {
  122. err := s.store.TxnEnd(txnID)
  123. if err != nil {
  124. return err
  125. }
  126. _, rev, _ := s.store.Range(nil, nil, 0, 0)
  127. evs := []storagepb.Event{}
  128. for k := range s.tx.putm {
  129. kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
  130. if err != nil {
  131. log.Panicf("unexpected range error (%v)", err)
  132. }
  133. ev := storagepb.Event{
  134. Type: storagepb.PUT,
  135. Kv: &kvs[0],
  136. }
  137. evs = append(evs, ev)
  138. }
  139. for k := range s.tx.delm {
  140. ev := storagepb.Event{
  141. Type: storagepb.DELETE,
  142. Kv: &storagepb.KeyValue{
  143. Key: []byte(k),
  144. },
  145. }
  146. evs = append(evs, ev)
  147. }
  148. s.handle(rev, evs)
  149. s.mu.Unlock()
  150. return nil
  151. }
  152. func (s *watchableStore) Close() error {
  153. close(s.stopc)
  154. s.wg.Wait()
  155. return s.store.Close()
  156. }
  157. func (s *watchableStore) NewWatchStream() WatchStream {
  158. watchStreamGauge.Inc()
  159. return &watchStream{
  160. watchable: s,
  161. ch: make(chan WatchResponse, chanBufLen),
  162. cancels: make(map[WatchID]cancelFunc),
  163. }
  164. }
  165. func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) {
  166. s.mu.Lock()
  167. defer s.mu.Unlock()
  168. wa := &watcher{
  169. key: key,
  170. prefix: prefix,
  171. cur: startRev,
  172. id: id,
  173. ch: ch,
  174. }
  175. k := string(key)
  176. if startRev == 0 {
  177. if err := unsafeAddWatcher(&s.synced, k, wa); err != nil {
  178. log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
  179. }
  180. } else {
  181. slowWatcherGauge.Inc()
  182. s.unsynced[wa] = struct{}{}
  183. }
  184. watcherGauge.Inc()
  185. cancel := cancelFunc(func() {
  186. s.mu.Lock()
  187. defer s.mu.Unlock()
  188. // remove global references of the watcher
  189. if _, ok := s.unsynced[wa]; ok {
  190. delete(s.unsynced, wa)
  191. slowWatcherGauge.Dec()
  192. watcherGauge.Dec()
  193. return
  194. }
  195. if v, ok := s.synced[k]; ok {
  196. if _, ok := v[wa]; ok {
  197. delete(v, wa)
  198. // if there is nothing in s.synced[k],
  199. // remove the key from the synced
  200. if len(v) == 0 {
  201. delete(s.synced, k)
  202. }
  203. watcherGauge.Dec()
  204. }
  205. }
  206. // If we cannot find it, it should have finished watch.
  207. })
  208. return wa, cancel
  209. }
  210. // syncWatchersLoop syncs the watcher in the unsyncd map every 100ms.
  211. func (s *watchableStore) syncWatchersLoop() {
  212. defer s.wg.Done()
  213. for {
  214. s.mu.Lock()
  215. s.syncWatchers()
  216. s.mu.Unlock()
  217. select {
  218. case <-time.After(100 * time.Millisecond):
  219. case <-s.stopc:
  220. return
  221. }
  222. }
  223. }
  224. // syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced
  225. // watchers to get the minimum revision within its range, skipping the
  226. // watcher if its current revision is behind the compact revision of the
  227. // store. And use this minimum revision to get all key-value pairs. Then send
  228. // those events to watchers.
  229. func (s *watchableStore) syncWatchers() {
  230. s.store.mu.Lock()
  231. defer s.store.mu.Unlock()
  232. if len(s.unsynced) == 0 {
  233. return
  234. }
  235. // in order to find key-value pairs from unsynced watchers, we need to
  236. // find min revision index, and these revisions can be used to
  237. // query the backend store of key-value pairs
  238. minRev := int64(math.MaxInt64)
  239. curRev := s.store.currentRev.main
  240. compactionRev := s.store.compactMainRev
  241. // TODO: change unsynced struct type same to this
  242. keyToUnsynced := make(map[string]map[*watcher]struct{})
  243. for w := range s.unsynced {
  244. k := string(w.key)
  245. if w.cur > curRev {
  246. panic("watcher current revision should not exceed current revision")
  247. }
  248. if w.cur < compactionRev {
  249. // TODO: return error compacted to that watcher instead of
  250. // just removing it sliently from unsynced.
  251. delete(s.unsynced, w)
  252. continue
  253. }
  254. if minRev >= w.cur {
  255. minRev = w.cur
  256. }
  257. if _, ok := keyToUnsynced[k]; !ok {
  258. keyToUnsynced[k] = make(map[*watcher]struct{})
  259. }
  260. keyToUnsynced[k][w] = struct{}{}
  261. }
  262. minBytes, maxBytes := newRevBytes(), newRevBytes()
  263. revToBytes(revision{main: minRev}, minBytes)
  264. revToBytes(revision{main: curRev + 1}, maxBytes)
  265. // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
  266. // values are actual key-value pairs in backend.
  267. tx := s.store.b.BatchTx()
  268. tx.Lock()
  269. ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
  270. tx.Unlock()
  271. evs := []storagepb.Event{}
  272. // get the list of all events from all key-value pairs
  273. for i, v := range vs {
  274. var kv storagepb.KeyValue
  275. if err := kv.Unmarshal(v); err != nil {
  276. log.Panicf("storage: cannot unmarshal event: %v", err)
  277. }
  278. k := string(kv.Key)
  279. if _, ok := keyToUnsynced[k]; !ok {
  280. continue
  281. }
  282. var ev storagepb.Event
  283. switch {
  284. case isTombstone(ks[i]):
  285. ev.Type = storagepb.DELETE
  286. default:
  287. ev.Type = storagepb.PUT
  288. }
  289. ev.Kv = &kv
  290. evs = append(evs, ev)
  291. }
  292. for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
  293. wr := WatchResponse{WatchID: w.id, Events: es}
  294. select {
  295. case w.ch <- wr:
  296. pendingEventsGauge.Add(float64(len(es)))
  297. default:
  298. // TODO: handle the full unsynced watchers.
  299. // continue to process other watchers for now, the full ones
  300. // will be processed next time and hopefully it will not be full.
  301. continue
  302. }
  303. k := string(w.key)
  304. if err := unsafeAddWatcher(&s.synced, k, w); err != nil {
  305. log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
  306. }
  307. delete(s.unsynced, w)
  308. }
  309. slowWatcherGauge.Set(float64(len(s.unsynced)))
  310. }
  311. // handle handles the change of the happening event on all watchers.
  312. func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
  313. s.notify(rev, evs)
  314. }
  315. // notify notifies the fact that given event at the given rev just happened to
  316. // watchers that watch on the key of the event.
  317. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
  318. we := newWatcherToEventMap(s.synced, evs)
  319. for _, wm := range s.synced {
  320. for w := range wm {
  321. es, ok := we[w]
  322. if !ok {
  323. continue
  324. }
  325. wr := WatchResponse{WatchID: w.id, Events: es}
  326. select {
  327. case w.ch <- wr:
  328. pendingEventsGauge.Add(float64(len(es)))
  329. default:
  330. // move slow watcher to unsynced
  331. w.cur = rev
  332. s.unsynced[w] = struct{}{}
  333. delete(wm, w)
  334. slowWatcherGauge.Inc()
  335. }
  336. }
  337. }
  338. }
  339. type ongoingTx struct {
  340. // keys put/deleted in the ongoing txn
  341. putm map[string]struct{}
  342. delm map[string]struct{}
  343. }
  344. func newOngoingTx() *ongoingTx {
  345. return &ongoingTx{
  346. putm: make(map[string]struct{}),
  347. delm: make(map[string]struct{}),
  348. }
  349. }
  350. func (tx *ongoingTx) put(k string) {
  351. tx.putm[k] = struct{}{}
  352. if _, ok := tx.delm[k]; ok {
  353. delete(tx.delm, k)
  354. }
  355. }
  356. func (tx *ongoingTx) del(k string) {
  357. tx.delm[k] = struct{}{}
  358. if _, ok := tx.putm[k]; ok {
  359. delete(tx.putm, k)
  360. }
  361. }
  362. type watcher struct {
  363. // the watcher key
  364. key []byte
  365. // prefix indicates if watcher is on a key or a prefix.
  366. // If prefix is true, the watcher is on a prefix.
  367. prefix bool
  368. // cur is the current watcher revision.
  369. // If cur is behind the current revision of the KV,
  370. // watcher is unsynced and needs to catch up.
  371. cur int64
  372. id WatchID
  373. // a chan to send out the watch response.
  374. // The chan might be shared with other watchers.
  375. ch chan<- WatchResponse
  376. }
  377. // unsafeAddWatcher puts watcher with key k into watchableStore's synced.
  378. // Make sure to this is thread-safe using mutex before and after.
  379. func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error {
  380. if wa == nil {
  381. return fmt.Errorf("nil watcher received")
  382. }
  383. mp := *synced
  384. if v, ok := mp[k]; ok {
  385. if _, ok := v[wa]; ok {
  386. return fmt.Errorf("put the same watcher twice: %+v", wa)
  387. } else {
  388. v[wa] = struct{}{}
  389. }
  390. return nil
  391. }
  392. mp[k] = make(map[*watcher]struct{})
  393. mp[k][wa] = struct{}{}
  394. return nil
  395. }
  396. // newWatcherToEventMap creates a map that has watcher as key and events as
  397. // value. It enables quick events look up by watcher.
  398. func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event {
  399. watcherToEvents := make(map[*watcher][]storagepb.Event)
  400. for _, ev := range evs {
  401. key := string(ev.Kv.Key)
  402. // check all prefixes of the key to notify all corresponded watchers
  403. for i := 0; i <= len(key); i++ {
  404. k := string(key[:i])
  405. wm, ok := sm[k]
  406. if !ok {
  407. continue
  408. }
  409. for w := range wm {
  410. // the watcher needs to be notified when either it watches prefix or
  411. // the key is exactly matched.
  412. if !w.prefix && i != len(ev.Kv.Key) {
  413. continue
  414. }
  415. if _, ok := watcherToEvents[w]; !ok {
  416. watcherToEvents[w] = []storagepb.Event{}
  417. }
  418. watcherToEvents[w] = append(watcherToEvents[w], ev)
  419. }
  420. }
  421. }
  422. return watcherToEvents
  423. }