tx.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. package bolt
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "sort"
  7. "time"
  8. "unsafe"
  9. )
  10. // txid represents the internal transaction identifier.
  11. type txid uint64
  12. // Tx represents a read-only or read/write transaction on the database.
  13. // Read-only transactions can be used for retrieving values for keys and creating cursors.
  14. // Read/write transactions can create and remove buckets and create and remove keys.
  15. //
  16. // IMPORTANT: You must commit or rollback transactions when you are done with
  17. // them. Pages can not be reclaimed by the writer until no more transactions
  18. // are using them. A long running read transaction can cause the database to
  19. // quickly grow.
  20. type Tx struct {
  21. writable bool
  22. managed bool
  23. db *DB
  24. meta *meta
  25. root Bucket
  26. pages map[pgid]*page
  27. stats TxStats
  28. commitHandlers []func()
  29. // WriteFlag specifies the flag for write-related methods like WriteTo().
  30. // Tx opens the database file with the specified flag to copy the data.
  31. //
  32. // By default, the flag is unset, which works well for mostly in-memory
  33. // workloads. For databases that are much larger than available RAM,
  34. // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
  35. WriteFlag int
  36. }
  37. // init initializes the transaction.
  38. func (tx *Tx) init(db *DB) {
  39. tx.db = db
  40. tx.pages = nil
  41. // Copy the meta page since it can be changed by the writer.
  42. tx.meta = &meta{}
  43. db.meta().copy(tx.meta)
  44. // Copy over the root bucket.
  45. tx.root = newBucket(tx)
  46. tx.root.bucket = &bucket{}
  47. *tx.root.bucket = tx.meta.root
  48. // Increment the transaction id and add a page cache for writable transactions.
  49. if tx.writable {
  50. tx.pages = make(map[pgid]*page)
  51. tx.meta.txid += txid(1)
  52. }
  53. }
  54. // ID returns the transaction id.
  55. func (tx *Tx) ID() int {
  56. return int(tx.meta.txid)
  57. }
  58. // DB returns a reference to the database that created the transaction.
  59. func (tx *Tx) DB() *DB {
  60. return tx.db
  61. }
  62. // Size returns current database size in bytes as seen by this transaction.
  63. func (tx *Tx) Size() int64 {
  64. return int64(tx.meta.pgid) * int64(tx.db.pageSize)
  65. }
  66. // Writable returns whether the transaction can perform write operations.
  67. func (tx *Tx) Writable() bool {
  68. return tx.writable
  69. }
  70. // Cursor creates a cursor associated with the root bucket.
  71. // All items in the cursor will return a nil value because all root bucket keys point to buckets.
  72. // The cursor is only valid as long as the transaction is open.
  73. // Do not use a cursor after the transaction is closed.
  74. func (tx *Tx) Cursor() *Cursor {
  75. return tx.root.Cursor()
  76. }
  77. // Stats retrieves a copy of the current transaction statistics.
  78. func (tx *Tx) Stats() TxStats {
  79. return tx.stats
  80. }
  81. // Bucket retrieves a bucket by name.
  82. // Returns nil if the bucket does not exist.
  83. // The bucket instance is only valid for the lifetime of the transaction.
  84. func (tx *Tx) Bucket(name []byte) *Bucket {
  85. return tx.root.Bucket(name)
  86. }
  87. // CreateBucket creates a new bucket.
  88. // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
  89. // The bucket instance is only valid for the lifetime of the transaction.
  90. func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
  91. return tx.root.CreateBucket(name)
  92. }
  93. // CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
  94. // Returns an error if the bucket name is blank, or if the bucket name is too long.
  95. // The bucket instance is only valid for the lifetime of the transaction.
  96. func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
  97. return tx.root.CreateBucketIfNotExists(name)
  98. }
  99. // DeleteBucket deletes a bucket.
  100. // Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
  101. func (tx *Tx) DeleteBucket(name []byte) error {
  102. return tx.root.DeleteBucket(name)
  103. }
  104. // ForEach executes a function for each bucket in the root.
  105. // If the provided function returns an error then the iteration is stopped and
  106. // the error is returned to the caller.
  107. func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
  108. return tx.root.ForEach(func(k, v []byte) error {
  109. if err := fn(k, tx.root.Bucket(k)); err != nil {
  110. return err
  111. }
  112. return nil
  113. })
  114. }
  115. // OnCommit adds a handler function to be executed after the transaction successfully commits.
  116. func (tx *Tx) OnCommit(fn func()) {
  117. tx.commitHandlers = append(tx.commitHandlers, fn)
  118. }
  119. // Commit writes all changes to disk and updates the meta page.
  120. // Returns an error if a disk write error occurs, or if Commit is
  121. // called on a read-only transaction.
  122. func (tx *Tx) Commit() error {
  123. _assert(!tx.managed, "managed tx commit not allowed")
  124. if tx.db == nil {
  125. return ErrTxClosed
  126. } else if !tx.writable {
  127. return ErrTxNotWritable
  128. }
  129. // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
  130. // Rebalance nodes which have had deletions.
  131. var startTime = time.Now()
  132. tx.root.rebalance()
  133. if tx.stats.Rebalance > 0 {
  134. tx.stats.RebalanceTime += time.Since(startTime)
  135. }
  136. // spill data onto dirty pages.
  137. startTime = time.Now()
  138. if err := tx.root.spill(); err != nil {
  139. tx.rollback()
  140. return err
  141. }
  142. tx.stats.SpillTime += time.Since(startTime)
  143. // Free the old root bucket.
  144. tx.meta.root.root = tx.root.root
  145. opgid := tx.meta.pgid
  146. // Free the freelist and allocate new pages for it. This will overestimate
  147. // the size of the freelist but not underestimate the size (which would be bad).
  148. tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
  149. p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
  150. if err != nil {
  151. tx.rollback()
  152. return err
  153. }
  154. if err := tx.db.freelist.write(p); err != nil {
  155. tx.rollback()
  156. return err
  157. }
  158. tx.meta.freelist = p.id
  159. // If the high water mark has moved up then attempt to grow the database.
  160. if tx.meta.pgid > opgid {
  161. if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
  162. tx.rollback()
  163. return err
  164. }
  165. }
  166. // Write dirty pages to disk.
  167. startTime = time.Now()
  168. if err := tx.write(); err != nil {
  169. tx.rollback()
  170. return err
  171. }
  172. // If strict mode is enabled then perform a consistency check.
  173. // Only the first consistency error is reported in the panic.
  174. if tx.db.StrictMode {
  175. if err, ok := <-tx.Check(); ok {
  176. panic("check fail: " + err.Error())
  177. }
  178. }
  179. // Write meta to disk.
  180. if err := tx.writeMeta(); err != nil {
  181. tx.rollback()
  182. return err
  183. }
  184. tx.stats.WriteTime += time.Since(startTime)
  185. // Finalize the transaction.
  186. tx.close()
  187. // Execute commit handlers now that the locks have been removed.
  188. for _, fn := range tx.commitHandlers {
  189. fn()
  190. }
  191. return nil
  192. }
  193. // Rollback closes the transaction and ignores all previous updates. Read-only
  194. // transactions must be rolled back and not committed.
  195. func (tx *Tx) Rollback() error {
  196. _assert(!tx.managed, "managed tx rollback not allowed")
  197. if tx.db == nil {
  198. return ErrTxClosed
  199. }
  200. tx.rollback()
  201. return nil
  202. }
  203. func (tx *Tx) rollback() {
  204. if tx.db == nil {
  205. return
  206. }
  207. if tx.writable {
  208. tx.db.freelist.rollback(tx.meta.txid)
  209. tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
  210. }
  211. tx.close()
  212. }
  213. func (tx *Tx) close() {
  214. if tx.db == nil {
  215. return
  216. }
  217. if tx.writable {
  218. // Grab freelist stats.
  219. var freelistFreeN = tx.db.freelist.free_count()
  220. var freelistPendingN = tx.db.freelist.pending_count()
  221. var freelistAlloc = tx.db.freelist.size()
  222. // Remove transaction ref & writer lock.
  223. tx.db.rwtx = nil
  224. tx.db.rwlock.Unlock()
  225. // Merge statistics.
  226. tx.db.statlock.Lock()
  227. tx.db.stats.FreePageN = freelistFreeN
  228. tx.db.stats.PendingPageN = freelistPendingN
  229. tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
  230. tx.db.stats.FreelistInuse = freelistAlloc
  231. tx.db.stats.TxStats.add(&tx.stats)
  232. tx.db.statlock.Unlock()
  233. } else {
  234. tx.db.removeTx(tx)
  235. }
  236. // Clear all references.
  237. tx.db = nil
  238. tx.meta = nil
  239. tx.root = Bucket{tx: tx}
  240. tx.pages = nil
  241. }
  242. // Copy writes the entire database to a writer.
  243. // This function exists for backwards compatibility. Use WriteTo() instead.
  244. func (tx *Tx) Copy(w io.Writer) error {
  245. _, err := tx.WriteTo(w)
  246. return err
  247. }
  248. // WriteTo writes the entire database to a writer.
  249. // If err == nil then exactly tx.Size() bytes will be written into the writer.
  250. func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
  251. // Attempt to open reader with WriteFlag
  252. f, err := os.OpenFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
  253. if err != nil {
  254. return 0, err
  255. }
  256. defer func() { _ = f.Close() }()
  257. // Copy the meta pages.
  258. tx.db.metalock.Lock()
  259. n, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
  260. tx.db.metalock.Unlock()
  261. if err != nil {
  262. return n, fmt.Errorf("meta copy: %s", err)
  263. }
  264. // Copy data pages.
  265. wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
  266. n += wn
  267. if err != nil {
  268. return n, err
  269. }
  270. return n, f.Close()
  271. }
  272. // CopyFile copies the entire database to file at the given path.
  273. // A reader transaction is maintained during the copy so it is safe to continue
  274. // using the database while a copy is in progress.
  275. func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
  276. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
  277. if err != nil {
  278. return err
  279. }
  280. err = tx.Copy(f)
  281. if err != nil {
  282. _ = f.Close()
  283. return err
  284. }
  285. return f.Close()
  286. }
  287. // Check performs several consistency checks on the database for this transaction.
  288. // An error is returned if any inconsistency is found.
  289. //
  290. // It can be safely run concurrently on a writable transaction. However, this
  291. // incurs a high cost for large databases and databases with a lot of subbuckets
  292. // because of caching. This overhead can be removed if running on a read-only
  293. // transaction, however, it is not safe to execute other writer transactions at
  294. // the same time.
  295. func (tx *Tx) Check() <-chan error {
  296. ch := make(chan error)
  297. go tx.check(ch)
  298. return ch
  299. }
  300. func (tx *Tx) check(ch chan error) {
  301. // Check if any pages are double freed.
  302. freed := make(map[pgid]bool)
  303. for _, id := range tx.db.freelist.all() {
  304. if freed[id] {
  305. ch <- fmt.Errorf("page %d: already freed", id)
  306. }
  307. freed[id] = true
  308. }
  309. // Track every reachable page.
  310. reachable := make(map[pgid]*page)
  311. reachable[0] = tx.page(0) // meta0
  312. reachable[1] = tx.page(1) // meta1
  313. for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
  314. reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
  315. }
  316. // Recursively check buckets.
  317. tx.checkBucket(&tx.root, reachable, freed, ch)
  318. // Ensure all pages below high water mark are either reachable or freed.
  319. for i := pgid(0); i < tx.meta.pgid; i++ {
  320. _, isReachable := reachable[i]
  321. if !isReachable && !freed[i] {
  322. ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
  323. }
  324. }
  325. // Close the channel to signal completion.
  326. close(ch)
  327. }
  328. func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
  329. // Ignore inline buckets.
  330. if b.root == 0 {
  331. return
  332. }
  333. // Check every page used by this bucket.
  334. b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
  335. if p.id > tx.meta.pgid {
  336. ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
  337. }
  338. // Ensure each page is only referenced once.
  339. for i := pgid(0); i <= pgid(p.overflow); i++ {
  340. var id = p.id + i
  341. if _, ok := reachable[id]; ok {
  342. ch <- fmt.Errorf("page %d: multiple references", int(id))
  343. }
  344. reachable[id] = p
  345. }
  346. // We should only encounter un-freed leaf and branch pages.
  347. if freed[p.id] {
  348. ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
  349. } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
  350. ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
  351. }
  352. })
  353. // Check each bucket within this bucket.
  354. _ = b.ForEach(func(k, v []byte) error {
  355. if child := b.Bucket(k); child != nil {
  356. tx.checkBucket(child, reachable, freed, ch)
  357. }
  358. return nil
  359. })
  360. }
  361. // allocate returns a contiguous block of memory starting at a given page.
  362. func (tx *Tx) allocate(count int) (*page, error) {
  363. p, err := tx.db.allocate(count)
  364. if err != nil {
  365. return nil, err
  366. }
  367. // Save to our page cache.
  368. tx.pages[p.id] = p
  369. // Update statistics.
  370. tx.stats.PageCount++
  371. tx.stats.PageAlloc += count * tx.db.pageSize
  372. return p, nil
  373. }
  374. // write writes any dirty pages to disk.
  375. func (tx *Tx) write() error {
  376. // Sort pages by id.
  377. pages := make(pages, 0, len(tx.pages))
  378. for _, p := range tx.pages {
  379. pages = append(pages, p)
  380. }
  381. sort.Sort(pages)
  382. // Write pages to disk in order.
  383. for _, p := range pages {
  384. size := (int(p.overflow) + 1) * tx.db.pageSize
  385. offset := int64(p.id) * int64(tx.db.pageSize)
  386. // Write out page in "max allocation" sized chunks.
  387. ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
  388. for {
  389. // Limit our write to our max allocation size.
  390. sz := size
  391. if sz > maxAllocSize-1 {
  392. sz = maxAllocSize - 1
  393. }
  394. // Write chunk to disk.
  395. buf := ptr[:sz]
  396. if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
  397. return err
  398. }
  399. // Update statistics.
  400. tx.stats.Write++
  401. // Exit inner for loop if we've written all the chunks.
  402. size -= sz
  403. if size == 0 {
  404. break
  405. }
  406. // Otherwise move offset forward and move pointer to next chunk.
  407. offset += int64(sz)
  408. ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
  409. }
  410. }
  411. // Ignore file sync if flag is set on DB.
  412. if !tx.db.NoSync || IgnoreNoSync {
  413. if err := fdatasync(tx.db); err != nil {
  414. return err
  415. }
  416. }
  417. // Clear out page cache.
  418. tx.pages = make(map[pgid]*page)
  419. return nil
  420. }
  421. // writeMeta writes the meta to the disk.
  422. func (tx *Tx) writeMeta() error {
  423. // Create a temporary buffer for the meta page.
  424. buf := make([]byte, tx.db.pageSize)
  425. p := tx.db.pageInBuffer(buf, 0)
  426. tx.meta.write(p)
  427. // Write the meta page to file.
  428. if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
  429. return err
  430. }
  431. if !tx.db.NoSync || IgnoreNoSync {
  432. if err := fdatasync(tx.db); err != nil {
  433. return err
  434. }
  435. }
  436. // Update statistics.
  437. tx.stats.Write++
  438. return nil
  439. }
  440. // page returns a reference to the page with a given id.
  441. // If page has been written to then a temporary buffered page is returned.
  442. func (tx *Tx) page(id pgid) *page {
  443. // Check the dirty pages first.
  444. if tx.pages != nil {
  445. if p, ok := tx.pages[id]; ok {
  446. return p
  447. }
  448. }
  449. // Otherwise return directly from the mmap.
  450. return tx.db.page(id)
  451. }
  452. // forEachPage iterates over every page within a given page and executes a function.
  453. func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
  454. p := tx.page(pgid)
  455. // Execute function.
  456. fn(p, depth)
  457. // Recursively loop over children.
  458. if (p.flags & branchPageFlag) != 0 {
  459. for i := 0; i < int(p.count); i++ {
  460. elem := p.branchPageElement(uint16(i))
  461. tx.forEachPage(elem.pgid, depth+1, fn)
  462. }
  463. }
  464. }
  465. // Page returns page information for a given page number.
  466. // This is only safe for concurrent use when used by a writable transaction.
  467. func (tx *Tx) Page(id int) (*PageInfo, error) {
  468. if tx.db == nil {
  469. return nil, ErrTxClosed
  470. } else if pgid(id) >= tx.meta.pgid {
  471. return nil, nil
  472. }
  473. // Build the page info.
  474. p := tx.db.page(pgid(id))
  475. info := &PageInfo{
  476. ID: id,
  477. Count: int(p.count),
  478. OverflowCount: int(p.overflow),
  479. }
  480. // Determine the type (or if it's free).
  481. if tx.db.freelist.freed(pgid(id)) {
  482. info.Type = "free"
  483. } else {
  484. info.Type = p.typ()
  485. }
  486. return info, nil
  487. }
  488. // TxStats represents statistics about the actions performed by the transaction.
  489. type TxStats struct {
  490. // Page statistics.
  491. PageCount int // number of page allocations
  492. PageAlloc int // total bytes allocated
  493. // Cursor statistics.
  494. CursorCount int // number of cursors created
  495. // Node statistics
  496. NodeCount int // number of node allocations
  497. NodeDeref int // number of node dereferences
  498. // Rebalance statistics.
  499. Rebalance int // number of node rebalances
  500. RebalanceTime time.Duration // total time spent rebalancing
  501. // Split/Spill statistics.
  502. Split int // number of nodes split
  503. Spill int // number of nodes spilled
  504. SpillTime time.Duration // total time spent spilling
  505. // Write statistics.
  506. Write int // number of writes performed
  507. WriteTime time.Duration // total time spent writing to disk
  508. }
  509. func (s *TxStats) add(other *TxStats) {
  510. s.PageCount += other.PageCount
  511. s.PageAlloc += other.PageAlloc
  512. s.CursorCount += other.CursorCount
  513. s.NodeCount += other.NodeCount
  514. s.NodeDeref += other.NodeDeref
  515. s.Rebalance += other.Rebalance
  516. s.RebalanceTime += other.RebalanceTime
  517. s.Split += other.Split
  518. s.Spill += other.Spill
  519. s.SpillTime += other.SpillTime
  520. s.Write += other.Write
  521. s.WriteTime += other.WriteTime
  522. }
  523. // Sub calculates and returns the difference between two sets of transaction stats.
  524. // This is useful when obtaining stats at two different points and time and
  525. // you need the performance counters that occurred within that time span.
  526. func (s *TxStats) Sub(other *TxStats) TxStats {
  527. var diff TxStats
  528. diff.PageCount = s.PageCount - other.PageCount
  529. diff.PageAlloc = s.PageAlloc - other.PageAlloc
  530. diff.CursorCount = s.CursorCount - other.CursorCount
  531. diff.NodeCount = s.NodeCount - other.NodeCount
  532. diff.NodeDeref = s.NodeDeref - other.NodeDeref
  533. diff.Rebalance = s.Rebalance - other.Rebalance
  534. diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
  535. diff.Split = s.Split - other.Split
  536. diff.Spill = s.Spill - other.Spill
  537. diff.SpillTime = s.SpillTime - other.SpillTime
  538. diff.Write = s.Write - other.Write
  539. diff.WriteTime = s.WriteTime - other.WriteTime
  540. return diff
  541. }