123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package mongo
- import (
- "encoding/json"
- "time"
- "github.com/globalsign/mgo"
- "github.com/tal-tech/go-zero/core/breaker"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/stores/mongo/internal"
- "github.com/tal-tech/go-zero/core/timex"
- )
- const slowThreshold = time.Millisecond * 500
- // ErrNotFound is an alias of mgo.ErrNotFound.
- var ErrNotFound = mgo.ErrNotFound
- type (
- // Collection interface represents a mongo connection.
- Collection interface {
- Find(query interface{}) Query
- FindId(id interface{}) Query
- Insert(docs ...interface{}) error
- Pipe(pipeline interface{}) Pipe
- Remove(selector interface{}) error
- RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
- RemoveId(id interface{}) error
- Update(selector, update interface{}) error
- UpdateId(id, update interface{}) error
- Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
- }
- decoratedCollection struct {
- name string
- collection internal.MgoCollection
- brk breaker.Breaker
- }
- keepablePromise struct {
- promise breaker.Promise
- log func(error)
- }
- )
- func newCollection(collection *mgo.Collection, brk breaker.Breaker) Collection {
- return &decoratedCollection{
- name: collection.FullName,
- collection: collection,
- brk: brk,
- }
- }
- func (c *decoratedCollection) Find(query interface{}) Query {
- promise, err := c.brk.Allow()
- if err != nil {
- return rejectedQuery{}
- }
- startTime := timex.Now()
- return promisedQuery{
- Query: c.collection.Find(query),
- promise: keepablePromise{
- promise: promise,
- log: func(err error) {
- duration := timex.Since(startTime)
- c.logDuration("find", duration, err, query)
- },
- },
- }
- }
- func (c *decoratedCollection) FindId(id interface{}) Query {
- promise, err := c.brk.Allow()
- if err != nil {
- return rejectedQuery{}
- }
- startTime := timex.Now()
- return promisedQuery{
- Query: c.collection.FindId(id),
- promise: keepablePromise{
- promise: promise,
- log: func(err error) {
- duration := timex.Since(startTime)
- c.logDuration("findId", duration, err, id)
- },
- },
- }
- }
- func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
- return c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("insert", duration, err, docs...)
- }()
- return c.collection.Insert(docs...)
- }, acceptable)
- }
- func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
- promise, err := c.brk.Allow()
- if err != nil {
- return rejectedPipe{}
- }
- startTime := timex.Now()
- return promisedPipe{
- Pipe: c.collection.Pipe(pipeline),
- promise: keepablePromise{
- promise: promise,
- log: func(err error) {
- duration := timex.Since(startTime)
- c.logDuration("pipe", duration, err, pipeline)
- },
- },
- }
- }
- func (c *decoratedCollection) Remove(selector interface{}) (err error) {
- return c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("remove", duration, err, selector)
- }()
- return c.collection.Remove(selector)
- }, acceptable)
- }
- func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("removeAll", duration, err, selector)
- }()
- info, err = c.collection.RemoveAll(selector)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
- return c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("removeId", duration, err, id)
- }()
- return c.collection.RemoveId(id)
- }, acceptable)
- }
- func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
- return c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("update", duration, err, selector, update)
- }()
- return c.collection.Update(selector, update)
- }, acceptable)
- }
- func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
- return c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("updateId", duration, err, id, update)
- }()
- return c.collection.UpdateId(id, update)
- }, acceptable)
- }
- func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
- err = c.brk.DoWithAcceptable(func() error {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- c.logDuration("upsert", duration, err, selector, update)
- }()
- info, err = c.collection.Upsert(selector, update)
- return err
- }, acceptable)
- return
- }
- func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
- content, e := json.Marshal(docs)
- if e != nil {
- logx.Error(err)
- } else if err != nil {
- if duration > slowThreshold {
- logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
- c.name, method, err.Error(), string(content))
- } else {
- logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
- c.name, method, err.Error(), string(content))
- }
- } else {
- if duration > slowThreshold {
- logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
- c.name, method, string(content))
- } else {
- logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
- }
- }
- }
- func (p keepablePromise) accept(err error) error {
- p.promise.Accept()
- p.log(err)
- return err
- }
- func (p keepablePromise) keep(err error) error {
- if acceptable(err) {
- p.promise.Accept()
- } else {
- p.promise.Reject(err.Error())
- }
- p.log(err)
- return err
- }
- func acceptable(err error) bool {
- return err == nil || err == mgo.ErrNotFound
- }
|