collection.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package mongo
  2. import (
  3. "encoding/json"
  4. "time"
  5. "github.com/globalsign/mgo"
  6. "github.com/tal-tech/go-zero/core/breaker"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. "github.com/tal-tech/go-zero/core/timex"
  9. )
  10. const slowThreshold = time.Millisecond * 500
  11. var ErrNotFound = mgo.ErrNotFound
  12. type (
  13. Collection interface {
  14. Find(query interface{}) Query
  15. FindId(id interface{}) Query
  16. Insert(docs ...interface{}) error
  17. Pipe(pipeline interface{}) Pipe
  18. Remove(selector interface{}) error
  19. RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
  20. RemoveId(id interface{}) error
  21. Update(selector, update interface{}) error
  22. UpdateId(id, update interface{}) error
  23. Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
  24. }
  25. decoratedCollection struct {
  26. *mgo.Collection
  27. brk breaker.Breaker
  28. }
  29. keepablePromise struct {
  30. promise breaker.Promise
  31. log func(error)
  32. }
  33. )
  34. func newCollection(collection *mgo.Collection) Collection {
  35. return &decoratedCollection{
  36. Collection: collection,
  37. brk: breaker.NewBreaker(),
  38. }
  39. }
  40. func (c *decoratedCollection) Find(query interface{}) Query {
  41. promise, err := c.brk.Allow()
  42. if err != nil {
  43. return rejectedQuery{}
  44. }
  45. startTime := timex.Now()
  46. return promisedQuery{
  47. Query: c.Collection.Find(query),
  48. promise: keepablePromise{
  49. promise: promise,
  50. log: func(err error) {
  51. duration := timex.Since(startTime)
  52. c.logDuration("find", duration, err, query)
  53. },
  54. },
  55. }
  56. }
  57. func (c *decoratedCollection) FindId(id interface{}) Query {
  58. promise, err := c.brk.Allow()
  59. if err != nil {
  60. return rejectedQuery{}
  61. }
  62. startTime := timex.Now()
  63. return promisedQuery{
  64. Query: c.Collection.FindId(id),
  65. promise: keepablePromise{
  66. promise: promise,
  67. log: func(err error) {
  68. duration := timex.Since(startTime)
  69. c.logDuration("findId", duration, err, id)
  70. },
  71. },
  72. }
  73. }
  74. func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
  75. return c.brk.DoWithAcceptable(func() error {
  76. startTime := timex.Now()
  77. defer func() {
  78. duration := timex.Since(startTime)
  79. c.logDuration("insert", duration, err, docs...)
  80. }()
  81. return c.Collection.Insert(docs...)
  82. }, acceptable)
  83. }
  84. func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
  85. promise, err := c.brk.Allow()
  86. if err != nil {
  87. return rejectedPipe{}
  88. }
  89. startTime := timex.Now()
  90. return promisedPipe{
  91. Pipe: c.Collection.Pipe(pipeline),
  92. promise: keepablePromise{
  93. promise: promise,
  94. log: func(err error) {
  95. duration := timex.Since(startTime)
  96. c.logDuration("pipe", duration, err, pipeline)
  97. },
  98. },
  99. }
  100. }
  101. func (c *decoratedCollection) Remove(selector interface{}) (err error) {
  102. return c.brk.DoWithAcceptable(func() error {
  103. startTime := timex.Now()
  104. defer func() {
  105. duration := timex.Since(startTime)
  106. c.logDuration("remove", duration, err, selector)
  107. }()
  108. return c.Collection.Remove(selector)
  109. }, acceptable)
  110. }
  111. func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
  112. err = c.brk.DoWithAcceptable(func() error {
  113. startTime := timex.Now()
  114. defer func() {
  115. duration := timex.Since(startTime)
  116. c.logDuration("removeAll", duration, err, selector)
  117. }()
  118. info, err = c.Collection.RemoveAll(selector)
  119. return err
  120. }, acceptable)
  121. return
  122. }
  123. func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
  124. return c.brk.DoWithAcceptable(func() error {
  125. startTime := timex.Now()
  126. defer func() {
  127. duration := timex.Since(startTime)
  128. c.logDuration("removeId", duration, err, id)
  129. }()
  130. return c.Collection.RemoveId(id)
  131. }, acceptable)
  132. }
  133. func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
  134. return c.brk.DoWithAcceptable(func() error {
  135. startTime := timex.Now()
  136. defer func() {
  137. duration := timex.Since(startTime)
  138. c.logDuration("update", duration, err, selector, update)
  139. }()
  140. return c.Collection.Update(selector, update)
  141. }, acceptable)
  142. }
  143. func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
  144. return c.brk.DoWithAcceptable(func() error {
  145. startTime := timex.Now()
  146. defer func() {
  147. duration := timex.Since(startTime)
  148. c.logDuration("updateId", duration, err, id, update)
  149. }()
  150. return c.Collection.UpdateId(id, update)
  151. }, acceptable)
  152. }
  153. func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
  154. err = c.brk.DoWithAcceptable(func() error {
  155. startTime := timex.Now()
  156. defer func() {
  157. duration := timex.Since(startTime)
  158. c.logDuration("upsert", duration, err, selector, update)
  159. }()
  160. info, err = c.Collection.Upsert(selector, update)
  161. return err
  162. }, acceptable)
  163. return
  164. }
  165. func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
  166. content, e := json.Marshal(docs)
  167. if e != nil {
  168. logx.Error(err)
  169. } else if err != nil {
  170. if duration > slowThreshold {
  171. logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
  172. c.FullName, method, err.Error(), string(content))
  173. } else {
  174. logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
  175. c.FullName, method, err.Error(), string(content))
  176. }
  177. } else {
  178. if duration > slowThreshold {
  179. logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
  180. c.FullName, method, string(content))
  181. } else {
  182. logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.FullName, method, string(content))
  183. }
  184. }
  185. }
  186. func (p keepablePromise) accept(err error) error {
  187. p.promise.Accept()
  188. p.log(err)
  189. return err
  190. }
  191. func (p keepablePromise) keep(err error) error {
  192. if acceptable(err) {
  193. p.promise.Accept()
  194. } else {
  195. p.promise.Reject(err.Error())
  196. }
  197. p.log(err)
  198. return err
  199. }
  200. func acceptable(err error) bool {
  201. return err == nil || err == mgo.ErrNotFound
  202. }