file_pipeline.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wal
  15. import (
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "github.com/coreos/etcd/pkg/fileutil"
  20. )
  21. // filePipeline pipelines allocating disk space
  22. type filePipeline struct {
  23. // dir to put files
  24. dir string
  25. // size of files to make, in bytes
  26. size int64
  27. // count number of files generated
  28. count int
  29. filec chan *fileutil.LockedFile
  30. errc chan error
  31. donec chan struct{}
  32. }
  33. func newFilePipeline(dir string, fileSize int64) *filePipeline {
  34. fp := &filePipeline{
  35. dir: dir,
  36. size: fileSize,
  37. filec: make(chan *fileutil.LockedFile),
  38. errc: make(chan error, 1),
  39. donec: make(chan struct{}),
  40. }
  41. go fp.run()
  42. return fp
  43. }
  44. // Open returns a fresh file for writing. Rename the file before calling
  45. // Open again or there will be file collisions.
  46. func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
  47. select {
  48. case f = <-fp.filec:
  49. case err = <-fp.errc:
  50. }
  51. return
  52. }
  53. func (fp *filePipeline) Close() error {
  54. close(fp.donec)
  55. return <-fp.errc
  56. }
  57. func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
  58. // count % 2 so this file isn't the same as the one last published
  59. fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
  60. if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
  61. return nil, err
  62. }
  63. if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
  64. plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
  65. f.Close()
  66. return nil, err
  67. }
  68. fp.count++
  69. return f, nil
  70. }
  71. func (fp *filePipeline) run() {
  72. defer close(fp.errc)
  73. for {
  74. f, err := fp.alloc()
  75. if err != nil {
  76. fp.errc <- err
  77. return
  78. }
  79. select {
  80. case fp.filec <- f:
  81. case <-fp.donec:
  82. os.Remove(f.Name())
  83. f.Close()
  84. return
  85. }
  86. }
  87. }