cachedcollection.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package mongoc
  2. import (
  3. "github.com/globalsign/mgo"
  4. "github.com/tal-tech/go-zero/core/stores/cache"
  5. "github.com/tal-tech/go-zero/core/stores/mongo"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. var (
  9. // ErrNotFound is an alias of mgo.ErrNotFound.
  10. ErrNotFound = mgo.ErrNotFound
  11. // can't use one SharedCalls per conn, because multiple conns may share the same cache key.
  12. sharedCalls = syncx.NewSharedCalls()
  13. stats = cache.NewStat("mongoc")
  14. )
  15. type (
  16. // QueryOption defines the method to customize a mongo query.
  17. QueryOption func(query mongo.Query) mongo.Query
  18. // CachedCollection interface represents a mongo collection with cache.
  19. CachedCollection interface {
  20. Count(query interface{}) (int, error)
  21. DelCache(keys ...string) error
  22. FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error
  23. FindOne(v interface{}, key string, query interface{}) error
  24. FindOneNoCache(v interface{}, query interface{}) error
  25. FindOneId(v interface{}, key string, id interface{}) error
  26. FindOneIdNoCache(v interface{}, id interface{}) error
  27. GetCache(key string, v interface{}) error
  28. Insert(docs ...interface{}) error
  29. Pipe(pipeline interface{}) mongo.Pipe
  30. Remove(selector interface{}, keys ...string) error
  31. RemoveNoCache(selector interface{}) error
  32. RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error)
  33. RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error)
  34. RemoveId(id interface{}, keys ...string) error
  35. RemoveIdNoCache(id interface{}) error
  36. SetCache(key string, v interface{}) error
  37. Update(selector, update interface{}, keys ...string) error
  38. UpdateNoCache(selector, update interface{}) error
  39. UpdateId(id, update interface{}, keys ...string) error
  40. UpdateIdNoCache(id, update interface{}) error
  41. Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error)
  42. UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error)
  43. }
  44. cachedCollection struct {
  45. collection mongo.Collection
  46. cache cache.Cache
  47. }
  48. )
  49. func newCollection(collection mongo.Collection, c cache.Cache) CachedCollection {
  50. return &cachedCollection{
  51. collection: collection,
  52. cache: c,
  53. }
  54. }
  55. func (c *cachedCollection) Count(query interface{}) (int, error) {
  56. return c.collection.Find(query).Count()
  57. }
  58. func (c *cachedCollection) DelCache(keys ...string) error {
  59. return c.cache.Del(keys...)
  60. }
  61. func (c *cachedCollection) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error {
  62. q := c.collection.Find(query)
  63. for _, opt := range opts {
  64. q = opt(q)
  65. }
  66. return q.All(v)
  67. }
  68. func (c *cachedCollection) FindOne(v interface{}, key string, query interface{}) error {
  69. return c.cache.Take(v, key, func(v interface{}) error {
  70. q := c.collection.Find(query)
  71. return q.One(v)
  72. })
  73. }
  74. func (c *cachedCollection) FindOneNoCache(v interface{}, query interface{}) error {
  75. q := c.collection.Find(query)
  76. return q.One(v)
  77. }
  78. func (c *cachedCollection) FindOneId(v interface{}, key string, id interface{}) error {
  79. return c.cache.Take(v, key, func(v interface{}) error {
  80. q := c.collection.FindId(id)
  81. return q.One(v)
  82. })
  83. }
  84. func (c *cachedCollection) FindOneIdNoCache(v interface{}, id interface{}) error {
  85. q := c.collection.FindId(id)
  86. return q.One(v)
  87. }
  88. func (c *cachedCollection) GetCache(key string, v interface{}) error {
  89. return c.cache.Get(key, v)
  90. }
  91. func (c *cachedCollection) Insert(docs ...interface{}) error {
  92. return c.collection.Insert(docs...)
  93. }
  94. func (c *cachedCollection) Pipe(pipeline interface{}) mongo.Pipe {
  95. return c.collection.Pipe(pipeline)
  96. }
  97. func (c *cachedCollection) Remove(selector interface{}, keys ...string) error {
  98. if err := c.RemoveNoCache(selector); err != nil {
  99. return err
  100. }
  101. return c.DelCache(keys...)
  102. }
  103. func (c *cachedCollection) RemoveNoCache(selector interface{}) error {
  104. return c.collection.Remove(selector)
  105. }
  106. func (c *cachedCollection) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) {
  107. info, err := c.RemoveAllNoCache(selector)
  108. if err != nil {
  109. return nil, err
  110. }
  111. if err := c.DelCache(keys...); err != nil {
  112. return nil, err
  113. }
  114. return info, nil
  115. }
  116. func (c *cachedCollection) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) {
  117. return c.collection.RemoveAll(selector)
  118. }
  119. func (c *cachedCollection) RemoveId(id interface{}, keys ...string) error {
  120. if err := c.RemoveIdNoCache(id); err != nil {
  121. return err
  122. }
  123. return c.DelCache(keys...)
  124. }
  125. func (c *cachedCollection) RemoveIdNoCache(id interface{}) error {
  126. return c.collection.RemoveId(id)
  127. }
  128. func (c *cachedCollection) SetCache(key string, v interface{}) error {
  129. return c.cache.Set(key, v)
  130. }
  131. func (c *cachedCollection) Update(selector, update interface{}, keys ...string) error {
  132. if err := c.UpdateNoCache(selector, update); err != nil {
  133. return err
  134. }
  135. return c.DelCache(keys...)
  136. }
  137. func (c *cachedCollection) UpdateNoCache(selector, update interface{}) error {
  138. return c.collection.Update(selector, update)
  139. }
  140. func (c *cachedCollection) UpdateId(id, update interface{}, keys ...string) error {
  141. if err := c.UpdateIdNoCache(id, update); err != nil {
  142. return err
  143. }
  144. return c.DelCache(keys...)
  145. }
  146. func (c *cachedCollection) UpdateIdNoCache(id, update interface{}) error {
  147. return c.collection.UpdateId(id, update)
  148. }
  149. func (c *cachedCollection) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) {
  150. info, err := c.UpsertNoCache(selector, update)
  151. if err != nil {
  152. return nil, err
  153. }
  154. if err := c.DelCache(keys...); err != nil {
  155. return nil, err
  156. }
  157. return info, nil
  158. }
  159. func (c *cachedCollection) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) {
  160. return c.collection.Upsert(selector, update)
  161. }