cachedcollection.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package mongoc
  2. import (
  3. "github.com/globalsign/mgo"
  4. "github.com/tal-tech/go-zero/core/stores/cache"
  5. "github.com/tal-tech/go-zero/core/stores/mongo"
  6. "github.com/tal-tech/go-zero/core/syncx"
  7. )
  8. var (
  9. ErrNotFound = mgo.ErrNotFound
  10. // can't use one SharedCalls per conn, because multiple conns may share the same cache key.
  11. sharedCalls = syncx.NewSharedCalls()
  12. stats = cache.NewCacheStat("mongoc")
  13. )
  14. type (
  15. QueryOption func(query mongo.Query) mongo.Query
  16. cachedCollection struct {
  17. collection mongo.Collection
  18. cache cache.Cache
  19. }
  20. )
  21. func newCollection(collection mongo.Collection, c cache.Cache) *cachedCollection {
  22. return &cachedCollection{
  23. collection: collection,
  24. cache: c,
  25. }
  26. }
  27. func (c *cachedCollection) Count(query interface{}) (int, error) {
  28. return c.collection.Find(query).Count()
  29. }
  30. func (c *cachedCollection) DelCache(keys ...string) error {
  31. return c.cache.DelCache(keys...)
  32. }
  33. func (c *cachedCollection) GetCache(key string, v interface{}) error {
  34. return c.cache.GetCache(key, v)
  35. }
  36. func (c *cachedCollection) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error {
  37. q := c.collection.Find(query)
  38. for _, opt := range opts {
  39. q = opt(q)
  40. }
  41. return q.All(v)
  42. }
  43. func (c *cachedCollection) FindOne(v interface{}, key string, query interface{}) error {
  44. return c.cache.Take(v, key, func(v interface{}) error {
  45. q := c.collection.Find(query)
  46. return q.One(v)
  47. })
  48. }
  49. func (c *cachedCollection) FindOneNoCache(v interface{}, query interface{}) error {
  50. q := c.collection.Find(query)
  51. return q.One(v)
  52. }
  53. func (c *cachedCollection) FindOneId(v interface{}, key string, id interface{}) error {
  54. return c.cache.Take(v, key, func(v interface{}) error {
  55. q := c.collection.FindId(id)
  56. return q.One(v)
  57. })
  58. }
  59. func (c *cachedCollection) FindOneIdNoCache(v interface{}, id interface{}) error {
  60. q := c.collection.FindId(id)
  61. return q.One(v)
  62. }
  63. func (c *cachedCollection) Insert(docs ...interface{}) error {
  64. return c.collection.Insert(docs...)
  65. }
  66. func (c *cachedCollection) Pipe(pipeline interface{}) mongo.Pipe {
  67. return c.collection.Pipe(pipeline)
  68. }
  69. func (c *cachedCollection) Remove(selector interface{}, keys ...string) error {
  70. if err := c.RemoveNoCache(selector); err != nil {
  71. return err
  72. }
  73. return c.DelCache(keys...)
  74. }
  75. func (c *cachedCollection) RemoveNoCache(selector interface{}) error {
  76. return c.collection.Remove(selector)
  77. }
  78. func (c *cachedCollection) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) {
  79. info, err := c.RemoveAllNoCache(selector)
  80. if err != nil {
  81. return nil, err
  82. }
  83. if err := c.DelCache(keys...); err != nil {
  84. return nil, err
  85. }
  86. return info, nil
  87. }
  88. func (c *cachedCollection) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) {
  89. return c.collection.RemoveAll(selector)
  90. }
  91. func (c *cachedCollection) RemoveId(id interface{}, keys ...string) error {
  92. if err := c.RemoveIdNoCache(id); err != nil {
  93. return err
  94. }
  95. return c.DelCache(keys...)
  96. }
  97. func (c *cachedCollection) RemoveIdNoCache(id interface{}) error {
  98. return c.collection.RemoveId(id)
  99. }
  100. func (c *cachedCollection) SetCache(key string, v interface{}) error {
  101. return c.cache.SetCache(key, v)
  102. }
  103. func (c *cachedCollection) Update(selector, update interface{}, keys ...string) error {
  104. if err := c.UpdateNoCache(selector, update); err != nil {
  105. return err
  106. }
  107. return c.DelCache(keys...)
  108. }
  109. func (c *cachedCollection) UpdateNoCache(selector, update interface{}) error {
  110. return c.collection.Update(selector, update)
  111. }
  112. func (c *cachedCollection) UpdateId(id, update interface{}, keys ...string) error {
  113. if err := c.UpdateIdNoCache(id, update); err != nil {
  114. return err
  115. }
  116. return c.DelCache(keys...)
  117. }
  118. func (c *cachedCollection) UpdateIdNoCache(id, update interface{}) error {
  119. return c.collection.UpdateId(id, update)
  120. }
  121. func (c *cachedCollection) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) {
  122. info, err := c.UpsertNoCache(selector, update)
  123. if err != nil {
  124. return nil, err
  125. }
  126. if err := c.DelCache(keys...); err != nil {
  127. return nil, err
  128. }
  129. return info, nil
  130. }
  131. func (c *cachedCollection) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) {
  132. return c.collection.Upsert(selector, update)
  133. }