collection.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package mongo
  2. import (
  3. "encoding/json"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. "github.com/tal-tech/go-zero/core/breaker"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. "github.com/tal-tech/go-zero/core/stores/mongo/internal"
  9. "github.com/tal-tech/go-zero/core/timex"
  10. )
  11. const slowThreshold = time.Millisecond * 500
  12. // ErrNotFound is an alias of mgo.ErrNotFound.
  13. var ErrNotFound = mgo.ErrNotFound
  14. type (
  15. // Collection interface represents a mongo connection.
  16. Collection interface {
  17. Find(query interface{}) Query
  18. FindId(id interface{}) Query
  19. Insert(docs ...interface{}) error
  20. Pipe(pipeline interface{}) Pipe
  21. Remove(selector interface{}) error
  22. RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
  23. RemoveId(id interface{}) error
  24. Update(selector, update interface{}) error
  25. UpdateId(id, update interface{}) error
  26. Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
  27. }
  28. decoratedCollection struct {
  29. name string
  30. collection internal.MgoCollection
  31. brk breaker.Breaker
  32. }
  33. keepablePromise struct {
  34. promise breaker.Promise
  35. log func(error)
  36. }
  37. )
  38. func newCollection(collection *mgo.Collection, brk breaker.Breaker) Collection {
  39. return &decoratedCollection{
  40. name: collection.FullName,
  41. collection: collection,
  42. brk: brk,
  43. }
  44. }
  45. func (c *decoratedCollection) Find(query interface{}) Query {
  46. promise, err := c.brk.Allow()
  47. if err != nil {
  48. return rejectedQuery{}
  49. }
  50. startTime := timex.Now()
  51. return promisedQuery{
  52. Query: c.collection.Find(query),
  53. promise: keepablePromise{
  54. promise: promise,
  55. log: func(err error) {
  56. duration := timex.Since(startTime)
  57. c.logDuration("find", duration, err, query)
  58. },
  59. },
  60. }
  61. }
  62. func (c *decoratedCollection) FindId(id interface{}) Query {
  63. promise, err := c.brk.Allow()
  64. if err != nil {
  65. return rejectedQuery{}
  66. }
  67. startTime := timex.Now()
  68. return promisedQuery{
  69. Query: c.collection.FindId(id),
  70. promise: keepablePromise{
  71. promise: promise,
  72. log: func(err error) {
  73. duration := timex.Since(startTime)
  74. c.logDuration("findId", duration, err, id)
  75. },
  76. },
  77. }
  78. }
  79. func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
  80. return c.brk.DoWithAcceptable(func() error {
  81. startTime := timex.Now()
  82. defer func() {
  83. duration := timex.Since(startTime)
  84. c.logDuration("insert", duration, err, docs...)
  85. }()
  86. return c.collection.Insert(docs...)
  87. }, acceptable)
  88. }
  89. func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
  90. promise, err := c.brk.Allow()
  91. if err != nil {
  92. return rejectedPipe{}
  93. }
  94. startTime := timex.Now()
  95. return promisedPipe{
  96. Pipe: c.collection.Pipe(pipeline),
  97. promise: keepablePromise{
  98. promise: promise,
  99. log: func(err error) {
  100. duration := timex.Since(startTime)
  101. c.logDuration("pipe", duration, err, pipeline)
  102. },
  103. },
  104. }
  105. }
  106. func (c *decoratedCollection) Remove(selector interface{}) (err error) {
  107. return c.brk.DoWithAcceptable(func() error {
  108. startTime := timex.Now()
  109. defer func() {
  110. duration := timex.Since(startTime)
  111. c.logDuration("remove", duration, err, selector)
  112. }()
  113. return c.collection.Remove(selector)
  114. }, acceptable)
  115. }
  116. func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
  117. err = c.brk.DoWithAcceptable(func() error {
  118. startTime := timex.Now()
  119. defer func() {
  120. duration := timex.Since(startTime)
  121. c.logDuration("removeAll", duration, err, selector)
  122. }()
  123. info, err = c.collection.RemoveAll(selector)
  124. return err
  125. }, acceptable)
  126. return
  127. }
  128. func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
  129. return c.brk.DoWithAcceptable(func() error {
  130. startTime := timex.Now()
  131. defer func() {
  132. duration := timex.Since(startTime)
  133. c.logDuration("removeId", duration, err, id)
  134. }()
  135. return c.collection.RemoveId(id)
  136. }, acceptable)
  137. }
  138. func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
  139. return c.brk.DoWithAcceptable(func() error {
  140. startTime := timex.Now()
  141. defer func() {
  142. duration := timex.Since(startTime)
  143. c.logDuration("update", duration, err, selector, update)
  144. }()
  145. return c.collection.Update(selector, update)
  146. }, acceptable)
  147. }
  148. func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
  149. return c.brk.DoWithAcceptable(func() error {
  150. startTime := timex.Now()
  151. defer func() {
  152. duration := timex.Since(startTime)
  153. c.logDuration("updateId", duration, err, id, update)
  154. }()
  155. return c.collection.UpdateId(id, update)
  156. }, acceptable)
  157. }
  158. func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
  159. err = c.brk.DoWithAcceptable(func() error {
  160. startTime := timex.Now()
  161. defer func() {
  162. duration := timex.Since(startTime)
  163. c.logDuration("upsert", duration, err, selector, update)
  164. }()
  165. info, err = c.collection.Upsert(selector, update)
  166. return err
  167. }, acceptable)
  168. return
  169. }
  170. func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
  171. content, e := json.Marshal(docs)
  172. if e != nil {
  173. logx.Error(err)
  174. } else if err != nil {
  175. if duration > slowThreshold {
  176. logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
  177. c.name, method, err.Error(), string(content))
  178. } else {
  179. logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
  180. c.name, method, err.Error(), string(content))
  181. }
  182. } else {
  183. if duration > slowThreshold {
  184. logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
  185. c.name, method, string(content))
  186. } else {
  187. logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
  188. }
  189. }
  190. }
  191. func (p keepablePromise) accept(err error) error {
  192. p.promise.Accept()
  193. p.log(err)
  194. return err
  195. }
  196. func (p keepablePromise) keep(err error) error {
  197. if acceptable(err) {
  198. p.promise.Accept()
  199. } else {
  200. p.promise.Reject(err.Error())
  201. }
  202. p.log(err)
  203. return err
  204. }
  205. func acceptable(err error) bool {
  206. return err == nil || err == mgo.ErrNotFound
  207. }