backend.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package backend
  15. import (
  16. "fmt"
  17. "hash/crc32"
  18. "io"
  19. "io/ioutil"
  20. "log"
  21. "os"
  22. "path"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
  27. )
  28. var (
  29. defaultBatchLimit = 10000
  30. defaultBatchInterval = 100 * time.Millisecond
  31. defragLimit = 10000
  32. // InitialMmapSize is the initial size of the mmapped region. Setting this larger than
  33. // the potential max db size can prevent writer from blocking reader.
  34. // This only works for linux.
  35. InitialMmapSize = 10 * 1024 * 1024 * 1024
  36. )
  37. type Backend interface {
  38. BatchTx() BatchTx
  39. Snapshot() Snapshot
  40. Hash() (uint32, error)
  41. // Size returns the current size of the backend.
  42. Size() int64
  43. Defrag() error
  44. ForceCommit()
  45. Close() error
  46. }
  47. type Snapshot interface {
  48. // Size gets the size of the snapshot.
  49. Size() int64
  50. // WriteTo writes the snapshot into the given writer.
  51. WriteTo(w io.Writer) (n int64, err error)
  52. // Close closes the snapshot.
  53. Close() error
  54. }
  55. type backend struct {
  56. mu sync.RWMutex
  57. db *bolt.DB
  58. batchInterval time.Duration
  59. batchLimit int
  60. batchTx *batchTx
  61. size int64
  62. // number of commits since start
  63. commits int64
  64. stopc chan struct{}
  65. donec chan struct{}
  66. }
  67. func New(path string, d time.Duration, limit int) Backend {
  68. return newBackend(path, d, limit)
  69. }
  70. func NewDefaultBackend(path string) Backend {
  71. return newBackend(path, defaultBatchInterval, defaultBatchLimit)
  72. }
  73. func newBackend(path string, d time.Duration, limit int) *backend {
  74. db, err := bolt.Open(path, 0600, boltOpenOptions)
  75. if err != nil {
  76. log.Panicf("backend: cannot open database at %s (%v)", path, err)
  77. }
  78. b := &backend{
  79. db: db,
  80. batchInterval: d,
  81. batchLimit: limit,
  82. stopc: make(chan struct{}),
  83. donec: make(chan struct{}),
  84. }
  85. b.batchTx = newBatchTx(b)
  86. go b.run()
  87. return b
  88. }
  89. // BatchTx returns the current batch tx in coalescer. The tx can be used for read and
  90. // write operations. The write result can be retrieved within the same tx immediately.
  91. // The write result is isolated with other txs until the current one get committed.
  92. func (b *backend) BatchTx() BatchTx {
  93. return b.batchTx
  94. }
  95. // ForceCommit forces the current batching tx to commit.
  96. func (b *backend) ForceCommit() {
  97. b.batchTx.Commit()
  98. }
  99. func (b *backend) Snapshot() Snapshot {
  100. b.batchTx.Commit()
  101. b.mu.RLock()
  102. defer b.mu.RUnlock()
  103. tx, err := b.db.Begin(false)
  104. if err != nil {
  105. log.Fatalf("backend: cannot begin tx (%s)", err)
  106. }
  107. return &snapshot{tx}
  108. }
  109. func (b *backend) Hash() (uint32, error) {
  110. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  111. b.mu.RLock()
  112. defer b.mu.RUnlock()
  113. err := b.db.View(func(tx *bolt.Tx) error {
  114. c := tx.Cursor()
  115. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  116. b := tx.Bucket(next)
  117. if b == nil {
  118. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  119. }
  120. h.Write(next)
  121. b.ForEach(func(k, v []byte) error {
  122. h.Write(k)
  123. h.Write(v)
  124. return nil
  125. })
  126. }
  127. return nil
  128. })
  129. if err != nil {
  130. return 0, err
  131. }
  132. return h.Sum32(), nil
  133. }
  134. func (b *backend) Size() int64 {
  135. return atomic.LoadInt64(&b.size)
  136. }
  137. func (b *backend) run() {
  138. defer close(b.donec)
  139. for {
  140. select {
  141. case <-time.After(b.batchInterval):
  142. case <-b.stopc:
  143. b.batchTx.CommitAndStop()
  144. return
  145. }
  146. b.batchTx.Commit()
  147. }
  148. }
  149. func (b *backend) Close() error {
  150. close(b.stopc)
  151. <-b.donec
  152. return b.db.Close()
  153. }
  154. // Commits returns total number of commits since start
  155. func (b *backend) Commits() int64 {
  156. return atomic.LoadInt64(&b.commits)
  157. }
  158. func (b *backend) Defrag() error {
  159. // TODO: make this non-blocking?
  160. // lock batchTx to ensure nobody is using previous tx, and then
  161. // close previous ongoing tx.
  162. b.batchTx.Lock()
  163. defer b.batchTx.Unlock()
  164. // lock database after lock tx to avoid deadlock.
  165. b.mu.Lock()
  166. defer b.mu.Unlock()
  167. b.batchTx.commit(true)
  168. b.batchTx.tx = nil
  169. tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
  170. if err != nil {
  171. return err
  172. }
  173. err = defragdb(b.db, tmpdb, defragLimit)
  174. if err != nil {
  175. tmpdb.Close()
  176. os.RemoveAll(tmpdb.Path())
  177. return err
  178. }
  179. dbp := b.db.Path()
  180. tdbp := tmpdb.Path()
  181. err = b.db.Close()
  182. if err != nil {
  183. log.Fatalf("backend: cannot close database (%s)", err)
  184. }
  185. err = tmpdb.Close()
  186. if err != nil {
  187. log.Fatalf("backend: cannot close database (%s)", err)
  188. }
  189. err = os.Rename(tdbp, dbp)
  190. if err != nil {
  191. log.Fatalf("backend: cannot rename database (%s)", err)
  192. }
  193. b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
  194. if err != nil {
  195. log.Panicf("backend: cannot open database at %s (%v)", dbp, err)
  196. }
  197. b.batchTx.tx, err = b.db.Begin(true)
  198. if err != nil {
  199. log.Fatalf("backend: cannot begin tx (%s)", err)
  200. }
  201. return nil
  202. }
  203. func defragdb(odb, tmpdb *bolt.DB, limit int) error {
  204. // open a tx on tmpdb for writes
  205. tmptx, err := tmpdb.Begin(true)
  206. if err != nil {
  207. return err
  208. }
  209. // open a tx on old db for read
  210. tx, err := odb.Begin(false)
  211. if err != nil {
  212. return err
  213. }
  214. defer tx.Rollback()
  215. c := tx.Cursor()
  216. count := 0
  217. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  218. b := tx.Bucket(next)
  219. if b == nil {
  220. return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
  221. }
  222. tmpb, berr := tmptx.CreateBucketIfNotExists(next)
  223. if berr != nil {
  224. return berr
  225. }
  226. b.ForEach(func(k, v []byte) error {
  227. count++
  228. if count > limit {
  229. err = tmptx.Commit()
  230. if err != nil {
  231. return err
  232. }
  233. tmptx, err = tmpdb.Begin(true)
  234. if err != nil {
  235. return err
  236. }
  237. tmpb = tmptx.Bucket(next)
  238. }
  239. err = tmpb.Put(k, v)
  240. if err != nil {
  241. return err
  242. }
  243. return nil
  244. })
  245. }
  246. return tmptx.Commit()
  247. }
  248. // NewTmpBackend creates a backend implementation for testing.
  249. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
  250. dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
  251. if err != nil {
  252. log.Fatal(err)
  253. }
  254. tmpPath := path.Join(dir, "database")
  255. return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
  256. }
  257. func NewDefaultTmpBackend() (*backend, string) {
  258. return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
  259. }
  260. type snapshot struct {
  261. *bolt.Tx
  262. }
  263. func (s *snapshot) Close() error { return s.Tx.Rollback() }