query.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package mongo
  2. import (
  3. "time"
  4. "github.com/globalsign/mgo"
  5. "github.com/tal-tech/go-zero/core/breaker"
  6. )
  7. type (
  8. // Query interface represents a mongo query.
  9. Query interface {
  10. All(result interface{}) error
  11. Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error)
  12. Batch(n int) Query
  13. Collation(collation *mgo.Collation) Query
  14. Comment(comment string) Query
  15. Count() (int, error)
  16. Distinct(key string, result interface{}) error
  17. Explain(result interface{}) error
  18. For(result interface{}, f func() error) error
  19. Hint(indexKey ...string) Query
  20. Iter() Iter
  21. Limit(n int) Query
  22. LogReplay() Query
  23. MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error)
  24. One(result interface{}) error
  25. Prefetch(p float64) Query
  26. Select(selector interface{}) Query
  27. SetMaxScan(n int) Query
  28. SetMaxTime(d time.Duration) Query
  29. Skip(n int) Query
  30. Snapshot() Query
  31. Sort(fields ...string) Query
  32. Tail(timeout time.Duration) Iter
  33. }
  34. promisedQuery struct {
  35. *mgo.Query
  36. promise keepablePromise
  37. }
  38. rejectedQuery struct{}
  39. )
  40. func (q promisedQuery) All(result interface{}) error {
  41. return q.promise.keep(q.Query.All(result))
  42. }
  43. func (q promisedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
  44. info, err := q.Query.Apply(change, result)
  45. return info, q.promise.keep(err)
  46. }
  47. func (q promisedQuery) Batch(n int) Query {
  48. return promisedQuery{
  49. Query: q.Query.Batch(n),
  50. promise: q.promise,
  51. }
  52. }
  53. func (q promisedQuery) Collation(collation *mgo.Collation) Query {
  54. return promisedQuery{
  55. Query: q.Query.Collation(collation),
  56. promise: q.promise,
  57. }
  58. }
  59. func (q promisedQuery) Comment(comment string) Query {
  60. return promisedQuery{
  61. Query: q.Query.Comment(comment),
  62. promise: q.promise,
  63. }
  64. }
  65. func (q promisedQuery) Count() (int, error) {
  66. v, err := q.Query.Count()
  67. return v, q.promise.keep(err)
  68. }
  69. func (q promisedQuery) Distinct(key string, result interface{}) error {
  70. return q.promise.keep(q.Query.Distinct(key, result))
  71. }
  72. func (q promisedQuery) Explain(result interface{}) error {
  73. return q.promise.keep(q.Query.Explain(result))
  74. }
  75. func (q promisedQuery) For(result interface{}, f func() error) error {
  76. var ferr error
  77. err := q.Query.For(result, func() error {
  78. ferr = f()
  79. return ferr
  80. })
  81. if ferr == err {
  82. return q.promise.accept(err)
  83. }
  84. return q.promise.keep(err)
  85. }
  86. func (q promisedQuery) Hint(indexKey ...string) Query {
  87. return promisedQuery{
  88. Query: q.Query.Hint(indexKey...),
  89. promise: q.promise,
  90. }
  91. }
  92. func (q promisedQuery) Iter() Iter {
  93. return promisedIter{
  94. Iter: q.Query.Iter(),
  95. promise: q.promise,
  96. }
  97. }
  98. func (q promisedQuery) Limit(n int) Query {
  99. return promisedQuery{
  100. Query: q.Query.Limit(n),
  101. promise: q.promise,
  102. }
  103. }
  104. func (q promisedQuery) LogReplay() Query {
  105. return promisedQuery{
  106. Query: q.Query.LogReplay(),
  107. promise: q.promise,
  108. }
  109. }
  110. func (q promisedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
  111. info, err := q.Query.MapReduce(job, result)
  112. return info, q.promise.keep(err)
  113. }
  114. func (q promisedQuery) One(result interface{}) error {
  115. return q.promise.keep(q.Query.One(result))
  116. }
  117. func (q promisedQuery) Prefetch(p float64) Query {
  118. return promisedQuery{
  119. Query: q.Query.Prefetch(p),
  120. promise: q.promise,
  121. }
  122. }
  123. func (q promisedQuery) Select(selector interface{}) Query {
  124. return promisedQuery{
  125. Query: q.Query.Select(selector),
  126. promise: q.promise,
  127. }
  128. }
  129. func (q promisedQuery) SetMaxScan(n int) Query {
  130. return promisedQuery{
  131. Query: q.Query.SetMaxScan(n),
  132. promise: q.promise,
  133. }
  134. }
  135. func (q promisedQuery) SetMaxTime(d time.Duration) Query {
  136. return promisedQuery{
  137. Query: q.Query.SetMaxTime(d),
  138. promise: q.promise,
  139. }
  140. }
  141. func (q promisedQuery) Skip(n int) Query {
  142. return promisedQuery{
  143. Query: q.Query.Skip(n),
  144. promise: q.promise,
  145. }
  146. }
  147. func (q promisedQuery) Snapshot() Query {
  148. return promisedQuery{
  149. Query: q.Query.Snapshot(),
  150. promise: q.promise,
  151. }
  152. }
  153. func (q promisedQuery) Sort(fields ...string) Query {
  154. return promisedQuery{
  155. Query: q.Query.Sort(fields...),
  156. promise: q.promise,
  157. }
  158. }
  159. func (q promisedQuery) Tail(timeout time.Duration) Iter {
  160. return promisedIter{
  161. Iter: q.Query.Tail(timeout),
  162. promise: q.promise,
  163. }
  164. }
  165. func (q rejectedQuery) All(result interface{}) error {
  166. return breaker.ErrServiceUnavailable
  167. }
  168. func (q rejectedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
  169. return nil, breaker.ErrServiceUnavailable
  170. }
  171. func (q rejectedQuery) Batch(n int) Query {
  172. return q
  173. }
  174. func (q rejectedQuery) Collation(collation *mgo.Collation) Query {
  175. return q
  176. }
  177. func (q rejectedQuery) Comment(comment string) Query {
  178. return q
  179. }
  180. func (q rejectedQuery) Count() (int, error) {
  181. return 0, breaker.ErrServiceUnavailable
  182. }
  183. func (q rejectedQuery) Distinct(key string, result interface{}) error {
  184. return breaker.ErrServiceUnavailable
  185. }
  186. func (q rejectedQuery) Explain(result interface{}) error {
  187. return breaker.ErrServiceUnavailable
  188. }
  189. func (q rejectedQuery) For(result interface{}, f func() error) error {
  190. return breaker.ErrServiceUnavailable
  191. }
  192. func (q rejectedQuery) Hint(indexKey ...string) Query {
  193. return q
  194. }
  195. func (q rejectedQuery) Iter() Iter {
  196. return rejectedIter{}
  197. }
  198. func (q rejectedQuery) Limit(n int) Query {
  199. return q
  200. }
  201. func (q rejectedQuery) LogReplay() Query {
  202. return q
  203. }
  204. func (q rejectedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
  205. return nil, breaker.ErrServiceUnavailable
  206. }
  207. func (q rejectedQuery) One(result interface{}) error {
  208. return breaker.ErrServiceUnavailable
  209. }
  210. func (q rejectedQuery) Prefetch(p float64) Query {
  211. return q
  212. }
  213. func (q rejectedQuery) Select(selector interface{}) Query {
  214. return q
  215. }
  216. func (q rejectedQuery) SetMaxScan(n int) Query {
  217. return q
  218. }
  219. func (q rejectedQuery) SetMaxTime(d time.Duration) Query {
  220. return q
  221. }
  222. func (q rejectedQuery) Skip(n int) Query {
  223. return q
  224. }
  225. func (q rejectedQuery) Snapshot() Query {
  226. return q
  227. }
  228. func (q rejectedQuery) Sort(fields ...string) Query {
  229. return q
  230. }
  231. func (q rejectedQuery) Tail(timeout time.Duration) Iter {
  232. return rejectedIter{}
  233. }