file_pipeline.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wal
  15. import (
  16. "fmt"
  17. "os"
  18. "path"
  19. "github.com/coreos/etcd/pkg/fileutil"
  20. )
  21. // filePipeline pipelines allocating disk space
  22. type filePipeline struct {
  23. // dir to put files
  24. dir string
  25. // size of files to make, in bytes
  26. size int64
  27. // count number of files generated
  28. count int
  29. filec chan *fileutil.LockedFile
  30. errc chan error
  31. donec chan struct{}
  32. }
  33. func newFilePipeline(dir string, fileSize int64) *filePipeline {
  34. fp := &filePipeline{
  35. dir: dir,
  36. size: fileSize,
  37. filec: make(chan *fileutil.LockedFile),
  38. errc: make(chan error, 1),
  39. donec: make(chan struct{}),
  40. }
  41. go fp.run()
  42. return fp
  43. }
  44. // Open returns a fresh file for writing
  45. func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
  46. select {
  47. case f = <-fp.filec:
  48. case err = <-fp.errc:
  49. }
  50. return
  51. }
  52. func (fp *filePipeline) Close() error {
  53. close(fp.donec)
  54. return <-fp.errc
  55. }
  56. func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
  57. fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count))
  58. if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil {
  59. return nil, err
  60. }
  61. if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
  62. plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
  63. f.Close()
  64. return nil, err
  65. }
  66. fp.count++
  67. return f, nil
  68. }
  69. func (fp *filePipeline) run() {
  70. defer close(fp.errc)
  71. for {
  72. f, err := fp.alloc()
  73. if err != nil {
  74. fp.errc <- err
  75. return
  76. }
  77. select {
  78. case fp.filec <- f:
  79. case <-fp.donec:
  80. os.Remove(f.Name())
  81. f.Close()
  82. return
  83. }
  84. }
  85. }