model.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package mongo
  2. import (
  3. "log"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. )
  7. type (
  8. options struct {
  9. timeout time.Duration
  10. }
  11. Option func(opts *options)
  12. Model struct {
  13. session *concurrentSession
  14. db *mgo.Database
  15. collection string
  16. opts []Option
  17. }
  18. )
  19. func MustNewModel(url, collection string, opts ...Option) *Model {
  20. model, err := NewModel(url, collection, opts...)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. return model
  25. }
  26. func NewModel(url, collection string, opts ...Option) (*Model, error) {
  27. session, err := getConcurrentSession(url)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return &Model{
  32. session: session,
  33. // If name is empty, the database name provided in the dialed URL is used instead
  34. db: session.DB(""),
  35. collection: collection,
  36. opts: opts,
  37. }, nil
  38. }
  39. func (mm *Model) Find(query interface{}) (Query, error) {
  40. return mm.query(func(c Collection) Query {
  41. return c.Find(query)
  42. })
  43. }
  44. func (mm *Model) FindId(id interface{}) (Query, error) {
  45. return mm.query(func(c Collection) Query {
  46. return c.FindId(id)
  47. })
  48. }
  49. func (mm *Model) GetCollection(session *mgo.Session) Collection {
  50. return newCollection(mm.db.C(mm.collection).With(session))
  51. }
  52. func (mm *Model) Insert(docs ...interface{}) error {
  53. return mm.execute(func(c Collection) error {
  54. return c.Insert(docs...)
  55. })
  56. }
  57. func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
  58. return mm.pipe(func(c Collection) Pipe {
  59. return c.Pipe(pipeline)
  60. })
  61. }
  62. func (mm *Model) PutSession(session *mgo.Session) {
  63. mm.session.putSession(session)
  64. }
  65. func (mm *Model) Remove(selector interface{}) error {
  66. return mm.execute(func(c Collection) error {
  67. return c.Remove(selector)
  68. })
  69. }
  70. func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
  71. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  72. return c.RemoveAll(selector)
  73. })
  74. }
  75. func (mm *Model) RemoveId(id interface{}) error {
  76. return mm.execute(func(c Collection) error {
  77. return c.RemoveId(id)
  78. })
  79. }
  80. func (mm *Model) TakeSession() (*mgo.Session, error) {
  81. return mm.session.takeSession(mm.opts...)
  82. }
  83. func (mm *Model) Update(selector, update interface{}) error {
  84. return mm.execute(func(c Collection) error {
  85. return c.Update(selector, update)
  86. })
  87. }
  88. func (mm *Model) UpdateId(id, update interface{}) error {
  89. return mm.execute(func(c Collection) error {
  90. return c.UpdateId(id, update)
  91. })
  92. }
  93. func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
  94. return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
  95. return c.Upsert(selector, update)
  96. })
  97. }
  98. func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
  99. session, err := mm.TakeSession()
  100. if err != nil {
  101. return nil, err
  102. }
  103. defer mm.PutSession(session)
  104. return fn(mm.GetCollection(session))
  105. }
  106. func (mm *Model) execute(fn func(c Collection) error) error {
  107. session, err := mm.TakeSession()
  108. if err != nil {
  109. return err
  110. }
  111. defer mm.PutSession(session)
  112. return fn(mm.GetCollection(session))
  113. }
  114. func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
  115. session, err := mm.TakeSession()
  116. if err != nil {
  117. return nil, err
  118. }
  119. defer mm.PutSession(session)
  120. return fn(mm.GetCollection(session)), nil
  121. }
  122. func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
  123. session, err := mm.TakeSession()
  124. if err != nil {
  125. return nil, err
  126. }
  127. defer mm.PutSession(session)
  128. return fn(mm.GetCollection(session)), nil
  129. }
  130. func WithTimeout(timeout time.Duration) Option {
  131. return func(opts *options) {
  132. opts.timeout = timeout
  133. }
  134. }