query_executor.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package gocql
  2. import (
  3. "errors"
  4. "time"
  5. )
  6. // ErrUnknownRetryType is returned if the retry policy returns a retry type
  7. // unknown to the query executor.
  8. var ErrUnknownRetryType = errors.New("unknown retry type returned by retry policy")
  9. type ExecutableQuery interface {
  10. execute(conn *Conn) *Iter
  11. attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
  12. retryPolicy() RetryPolicy
  13. GetRoutingKey() ([]byte, error)
  14. Keyspace() string
  15. RetryableQuery
  16. }
  17. type queryExecutor struct {
  18. pool *policyConnPool
  19. policy HostSelectionPolicy
  20. }
  21. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  22. start := time.Now()
  23. iter := qry.execute(conn)
  24. end := time.Now()
  25. qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
  26. return iter
  27. }
  28. // checkRetryPolicy is used by the query executor to determine how a failed query should be handled.
  29. // It consults the query context and the query's retry policy.
  30. func (q *queryExecutor) checkRetryPolicy(rq ExecutableQuery, err error) (RetryType, error) {
  31. if ctx := rq.Context(); ctx != nil {
  32. if ctx.Err() != nil {
  33. return Rethrow, ctx.Err()
  34. }
  35. }
  36. p := rq.retryPolicy()
  37. if p == nil {
  38. return Rethrow, err
  39. }
  40. if p.Attempt(rq) {
  41. return p.GetRetryType(err), nil
  42. }
  43. return p.GetRetryType(err), err
  44. }
  45. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  46. hostIter := q.policy.Pick(qry)
  47. var iter *Iter
  48. outer:
  49. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  50. host := hostResponse.Info()
  51. if host == nil || !host.IsUp() {
  52. continue
  53. }
  54. hostPool, ok := q.pool.getPool(host)
  55. if !ok {
  56. continue
  57. }
  58. conn := hostPool.Pick()
  59. if conn == nil {
  60. continue
  61. }
  62. inner:
  63. for {
  64. iter = q.attemptQuery(qry, conn)
  65. // Update host
  66. hostResponse.Mark(iter.err)
  67. // note host the query was issued against
  68. iter.host = host
  69. // exit if the query was successful
  70. if iter.err == nil {
  71. return iter, nil
  72. }
  73. // consult retry policy on how to proceed
  74. var retryType RetryType
  75. retryType, iter.err = q.checkRetryPolicy(qry, iter.err)
  76. switch retryType {
  77. case Retry:
  78. continue inner
  79. case Rethrow:
  80. return nil, iter.err
  81. case Ignore:
  82. return iter, nil
  83. case RetryNextHost:
  84. continue outer
  85. default:
  86. return nil, ErrUnknownRetryType
  87. }
  88. }
  89. }
  90. // if we reach this point, there is no host in the pool
  91. return nil, ErrNoConnections
  92. }