123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package mongo
- import (
- "time"
- "github.com/globalsign/mgo"
- "github.com/tal-tech/go-zero/core/breaker"
- )
- type (
- // Query interface represents a mongo query.
- Query interface {
- All(result interface{}) error
- Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error)
- Batch(n int) Query
- Collation(collation *mgo.Collation) Query
- Comment(comment string) Query
- Count() (int, error)
- Distinct(key string, result interface{}) error
- Explain(result interface{}) error
- For(result interface{}, f func() error) error
- Hint(indexKey ...string) Query
- Iter() Iter
- Limit(n int) Query
- LogReplay() Query
- MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error)
- One(result interface{}) error
- Prefetch(p float64) Query
- Select(selector interface{}) Query
- SetMaxScan(n int) Query
- SetMaxTime(d time.Duration) Query
- Skip(n int) Query
- Snapshot() Query
- Sort(fields ...string) Query
- Tail(timeout time.Duration) Iter
- }
- promisedQuery struct {
- *mgo.Query
- promise keepablePromise
- }
- rejectedQuery struct{}
- )
- func (q promisedQuery) All(result interface{}) error {
- return q.promise.keep(q.Query.All(result))
- }
- func (q promisedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
- info, err := q.Query.Apply(change, result)
- return info, q.promise.keep(err)
- }
- func (q promisedQuery) Batch(n int) Query {
- return promisedQuery{
- Query: q.Query.Batch(n),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Collation(collation *mgo.Collation) Query {
- return promisedQuery{
- Query: q.Query.Collation(collation),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Comment(comment string) Query {
- return promisedQuery{
- Query: q.Query.Comment(comment),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Count() (int, error) {
- v, err := q.Query.Count()
- return v, q.promise.keep(err)
- }
- func (q promisedQuery) Distinct(key string, result interface{}) error {
- return q.promise.keep(q.Query.Distinct(key, result))
- }
- func (q promisedQuery) Explain(result interface{}) error {
- return q.promise.keep(q.Query.Explain(result))
- }
- func (q promisedQuery) For(result interface{}, f func() error) error {
- var ferr error
- err := q.Query.For(result, func() error {
- ferr = f()
- return ferr
- })
- if ferr == err {
- return q.promise.accept(err)
- }
- return q.promise.keep(err)
- }
- func (q promisedQuery) Hint(indexKey ...string) Query {
- return promisedQuery{
- Query: q.Query.Hint(indexKey...),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Iter() Iter {
- return promisedIter{
- Iter: q.Query.Iter(),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Limit(n int) Query {
- return promisedQuery{
- Query: q.Query.Limit(n),
- promise: q.promise,
- }
- }
- func (q promisedQuery) LogReplay() Query {
- return promisedQuery{
- Query: q.Query.LogReplay(),
- promise: q.promise,
- }
- }
- func (q promisedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
- info, err := q.Query.MapReduce(job, result)
- return info, q.promise.keep(err)
- }
- func (q promisedQuery) One(result interface{}) error {
- return q.promise.keep(q.Query.One(result))
- }
- func (q promisedQuery) Prefetch(p float64) Query {
- return promisedQuery{
- Query: q.Query.Prefetch(p),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Select(selector interface{}) Query {
- return promisedQuery{
- Query: q.Query.Select(selector),
- promise: q.promise,
- }
- }
- func (q promisedQuery) SetMaxScan(n int) Query {
- return promisedQuery{
- Query: q.Query.SetMaxScan(n),
- promise: q.promise,
- }
- }
- func (q promisedQuery) SetMaxTime(d time.Duration) Query {
- return promisedQuery{
- Query: q.Query.SetMaxTime(d),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Skip(n int) Query {
- return promisedQuery{
- Query: q.Query.Skip(n),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Snapshot() Query {
- return promisedQuery{
- Query: q.Query.Snapshot(),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Sort(fields ...string) Query {
- return promisedQuery{
- Query: q.Query.Sort(fields...),
- promise: q.promise,
- }
- }
- func (q promisedQuery) Tail(timeout time.Duration) Iter {
- return promisedIter{
- Iter: q.Query.Tail(timeout),
- promise: q.promise,
- }
- }
- func (q rejectedQuery) All(result interface{}) error {
- return breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
- return nil, breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Batch(n int) Query {
- return q
- }
- func (q rejectedQuery) Collation(collation *mgo.Collation) Query {
- return q
- }
- func (q rejectedQuery) Comment(comment string) Query {
- return q
- }
- func (q rejectedQuery) Count() (int, error) {
- return 0, breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Distinct(key string, result interface{}) error {
- return breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Explain(result interface{}) error {
- return breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) For(result interface{}, f func() error) error {
- return breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Hint(indexKey ...string) Query {
- return q
- }
- func (q rejectedQuery) Iter() Iter {
- return rejectedIter{}
- }
- func (q rejectedQuery) Limit(n int) Query {
- return q
- }
- func (q rejectedQuery) LogReplay() Query {
- return q
- }
- func (q rejectedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
- return nil, breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) One(result interface{}) error {
- return breaker.ErrServiceUnavailable
- }
- func (q rejectedQuery) Prefetch(p float64) Query {
- return q
- }
- func (q rejectedQuery) Select(selector interface{}) Query {
- return q
- }
- func (q rejectedQuery) SetMaxScan(n int) Query {
- return q
- }
- func (q rejectedQuery) SetMaxTime(d time.Duration) Query {
- return q
- }
- func (q rejectedQuery) Skip(n int) Query {
- return q
- }
- func (q rejectedQuery) Snapshot() Query {
- return q
- }
- func (q rejectedQuery) Sort(fields ...string) Query {
- return q
- }
- func (q rejectedQuery) Tail(timeout time.Duration) Iter {
- return rejectedIter{}
- }
|