consistent_watchable_store.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "encoding/binary"
  17. "log"
  18. )
  19. var (
  20. consistentIndexKeyName = []byte("consistent_index")
  21. )
  22. // ConsistentIndexGetter is an interface that wraps the Get method.
  23. // Consistent index is the offset of an entry in a consistent replicated log.
  24. type ConsistentIndexGetter interface {
  25. // ConsistentIndex returns the consistent index of current executing entry.
  26. ConsistentIndex() uint64
  27. }
  28. type consistentWatchableStore struct {
  29. *watchableStore
  30. // The field is used to get the consistent index of current
  31. // executing entry.
  32. // When the store finishes executing current entry, it will
  33. // put the index got from ConsistentIndexGetter into the
  34. // underlying backend. This helps to recover consistent index
  35. // when restoring.
  36. ig ConsistentIndexGetter
  37. }
  38. func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
  39. return newConsistentWatchableStore(path, ig)
  40. }
  41. // newConsistentWatchableStore creates a new consistentWatchableStore
  42. // using the file at the given path.
  43. // If the file at the given path does not exist then it will be created automatically.
  44. func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore {
  45. return &consistentWatchableStore{
  46. watchableStore: newWatchableStore(path),
  47. ig: ig,
  48. }
  49. }
  50. func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) {
  51. id := s.TxnBegin()
  52. rev, err := s.TxnPut(id, key, value)
  53. if err != nil {
  54. log.Panicf("unexpected TxnPut error (%v)", err)
  55. }
  56. if err := s.TxnEnd(id); err != nil {
  57. log.Panicf("unexpected TxnEnd error (%v)", err)
  58. }
  59. return rev
  60. }
  61. func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
  62. id := s.TxnBegin()
  63. n, rev, err := s.TxnDeleteRange(id, key, end)
  64. if err != nil {
  65. log.Panicf("unexpected TxnDeleteRange error (%v)", err)
  66. }
  67. if err := s.TxnEnd(id); err != nil {
  68. log.Panicf("unexpected TxnEnd error (%v)", err)
  69. }
  70. return n, rev
  71. }
  72. func (s *consistentWatchableStore) TxnBegin() int64 {
  73. id := s.watchableStore.TxnBegin()
  74. // TODO: avoid this unnecessary allocation
  75. bs := make([]byte, 8)
  76. binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
  77. // put the index into the underlying backend
  78. // tx has been locked in TxnBegin, so there is no need to lock it again
  79. s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
  80. return id
  81. }
  82. func (s *consistentWatchableStore) ConsistentIndex() uint64 {
  83. tx := s.watchableStore.store.b.BatchTx()
  84. tx.Lock()
  85. defer tx.Unlock()
  86. // get the index
  87. _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
  88. if len(vs) == 0 {
  89. return 0
  90. }
  91. return binary.BigEndian.Uint64(vs[0])
  92. }