model.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package mongo
  2. import (
  3. "log"
  4. "time"
  5. "git.i2edu.net/i2/go-zero/core/breaker"
  6. "github.com/globalsign/mgo"
  7. )
  8. type (
  9. options struct {
  10. timeout time.Duration
  11. }
  12. // Option defines the method to customize a mongo model.
  13. Option func(opts *options)
  14. // A Model is a mongo model.
  15. Model struct {
  16. session *concurrentSession
  17. db *mgo.Database
  18. collection string
  19. brk breaker.Breaker
  20. opts []Option
  21. }
  22. )
  23. // MustNewModel returns a Model, exits on errors.
  24. func MustNewModel(url, collection string, opts ...Option) *Model {
  25. model, err := NewModel(url, collection, opts...)
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. return model
  30. }
  31. // NewModel returns a Model.
  32. func NewModel(url, collection string, opts ...Option) (*Model, error) {
  33. session, err := getConcurrentSession(url)
  34. if err != nil {
  35. return nil, err
  36. }
  37. return &Model{
  38. session: session,
  39. // If name is empty, the database name provided in the dialed URL is used instead
  40. db: session.DB(""),
  41. collection: collection,
  42. brk: breaker.GetBreaker(url),
  43. opts: opts,
  44. }, nil
  45. }
  46. // Find finds a record with given query.
  47. func (mm *Model) Find(query interface{}) (Query, error) {
  48. return mm.query(func(c Collection) Query {
  49. return c.Find(query)
  50. })
  51. }
  52. // FindId finds a record with given id.
  53. func (mm *Model) FindId(id interface{}) (Query, error) {
  54. return mm.query(func(c Collection) Query {
  55. return c.FindId(id)
  56. })
  57. }
  58. // GetCollection returns a Collection with given session.
  59. func (mm *Model) GetCollection(session *mgo.Session) Collection {
  60. return newCollection(mm.db.C(mm.collection).With(session), mm.brk)
  61. }
  62. // Insert inserts docs into mm.
  63. func (mm *Model) Insert(docs ...interface{}) error {
  64. return mm.execute(func(c Collection) error {
  65. return c.Insert(docs...)
  66. })
  67. }
  68. // Pipe returns a Pipe with given pipeline.
  69. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
  70. return mm.pipe(func(c Collection) Pipe {
  71. return c.Pipe(pipeline)
  72. })
  73. }
  74. // PutSession returns the given session.
  75. func (mm *Model) PutSession(session *mgo.Session) {
  76. mm.session.putSession(session)
  77. }
  78. // Remove removes the records with given selector.
  79. func (mm *Model) Remove(selector interface{}) error {
  80. return mm.execute(func(c Collection) error {
  81. return c.Remove(selector)
  82. })
  83. }
  84. // RemoveAll removes all with given selector and returns a mgo.ChangeInfo.
  85. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
  86. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  87. return c.RemoveAll(selector)
  88. })
  89. }
  90. // RemoveId removes a record with given id.
  91. func (mm *Model) RemoveId(id interface{}) error {
  92. return mm.execute(func(c Collection) error {
  93. return c.RemoveId(id)
  94. })
  95. }
  96. // TakeSession gets a session.
  97. func (mm *Model) TakeSession() (*mgo.Session, error) {
  98. return mm.session.takeSession(mm.opts...)
  99. }
  100. // Update updates a record with given selector.
  101. func (mm *Model) Update(selector, update interface{}) error {
  102. return mm.execute(func(c Collection) error {
  103. return c.Update(selector, update)
  104. })
  105. }
  106. // UpdateId updates a record with given id.
  107. func (mm *Model) UpdateId(id, update interface{}) error {
  108. return mm.execute(func(c Collection) error {
  109. return c.UpdateId(id, update)
  110. })
  111. }
  112. // Upsert upserts a record with given selector, and returns a mgo.ChangeInfo.
  113. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
  114. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  115. return c.Upsert(selector, update)
  116. })
  117. }
  118. func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
  119. session, err := mm.TakeSession()
  120. if err != nil {
  121. return nil, err
  122. }
  123. defer mm.PutSession(session)
  124. return fn(mm.GetCollection(session))
  125. }
  126. func (mm *Model) execute(fn func(c Collection) error) error {
  127. session, err := mm.TakeSession()
  128. if err != nil {
  129. return err
  130. }
  131. defer mm.PutSession(session)
  132. return fn(mm.GetCollection(session))
  133. }
  134. func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
  135. session, err := mm.TakeSession()
  136. if err != nil {
  137. return nil, err
  138. }
  139. defer mm.PutSession(session)
  140. return fn(mm.GetCollection(session)), nil
  141. }
  142. func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
  143. session, err := mm.TakeSession()
  144. if err != nil {
  145. return nil, err
  146. }
  147. defer mm.PutSession(session)
  148. return fn(mm.GetCollection(session)), nil
  149. }
  150. // WithTimeout customizes an operation with given timeout.
  151. func WithTimeout(timeout time.Duration) Option {
  152. return func(opts *options) {
  153. opts.timeout = timeout
  154. }
  155. }