pipe.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package mongo
  2. import (
  3. "time"
  4. "github.com/globalsign/mgo"
  5. "github.com/tal-tech/go-zero/core/breaker"
  6. )
  7. type (
  8. Pipe interface {
  9. All(result interface{}) error
  10. AllowDiskUse() Pipe
  11. Batch(n int) Pipe
  12. Collation(collation *mgo.Collation) Pipe
  13. Explain(result interface{}) error
  14. Iter() Iter
  15. One(result interface{}) error
  16. SetMaxTime(d time.Duration) Pipe
  17. }
  18. promisedPipe struct {
  19. *mgo.Pipe
  20. promise keepablePromise
  21. }
  22. rejectedPipe struct{}
  23. )
  24. func (p promisedPipe) All(result interface{}) error {
  25. return p.promise.keep(p.Pipe.All(result))
  26. }
  27. func (p promisedPipe) AllowDiskUse() Pipe {
  28. p.Pipe.AllowDiskUse()
  29. return p
  30. }
  31. func (p promisedPipe) Batch(n int) Pipe {
  32. p.Pipe.Batch(n)
  33. return p
  34. }
  35. func (p promisedPipe) Collation(collation *mgo.Collation) Pipe {
  36. p.Pipe.Collation(collation)
  37. return p
  38. }
  39. func (p promisedPipe) Explain(result interface{}) error {
  40. return p.promise.keep(p.Pipe.Explain(result))
  41. }
  42. func (p promisedPipe) Iter() Iter {
  43. return promisedIter{
  44. Iter: p.Pipe.Iter(),
  45. promise: p.promise,
  46. }
  47. }
  48. func (p promisedPipe) One(result interface{}) error {
  49. return p.promise.keep(p.Pipe.One(result))
  50. }
  51. func (p promisedPipe) SetMaxTime(d time.Duration) Pipe {
  52. p.Pipe.SetMaxTime(d)
  53. return p
  54. }
  55. func (p rejectedPipe) All(result interface{}) error {
  56. return breaker.ErrServiceUnavailable
  57. }
  58. func (p rejectedPipe) AllowDiskUse() Pipe {
  59. return p
  60. }
  61. func (p rejectedPipe) Batch(n int) Pipe {
  62. return p
  63. }
  64. func (p rejectedPipe) Collation(collation *mgo.Collation) Pipe {
  65. return p
  66. }
  67. func (p rejectedPipe) Explain(result interface{}) error {
  68. return breaker.ErrServiceUnavailable
  69. }
  70. func (p rejectedPipe) Iter() Iter {
  71. return rejectedIter{}
  72. }
  73. func (p rejectedPipe) One(result interface{}) error {
  74. return breaker.ErrServiceUnavailable
  75. }
  76. func (p rejectedPipe) SetMaxTime(d time.Duration) Pipe {
  77. return p
  78. }