query_executor.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. Keyspace() string
  11. Cancel()
  12. RetryableQuery
  13. }
  14. type queryExecutor struct {
  15. pool *policyConnPool
  16. policy HostSelectionPolicy
  17. }
  18. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  19. start := time.Now()
  20. iter := qry.execute(conn)
  21. end := time.Now()
  22. qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
  23. return iter
  24. }
  25. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  26. rt := qry.retryPolicy()
  27. hostIter := q.policy.Pick(qry)
  28. var iter *Iter
  29. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  30. host := hostResponse.Info()
  31. if host == nil || !host.IsUp() {
  32. continue
  33. }
  34. pool, ok := q.pool.getPool(host)
  35. if !ok {
  36. continue
  37. }
  38. conn := pool.Pick()
  39. if conn == nil {
  40. continue
  41. }
  42. iter = q.attemptQuery(qry, conn)
  43. // Update host
  44. hostResponse.Mark(iter.err)
  45. if rt == nil {
  46. iter.host = host
  47. break
  48. }
  49. switch rt.GetRetryType(iter.err) {
  50. case Retry:
  51. for rt.Attempt(qry) {
  52. iter = q.attemptQuery(qry, conn)
  53. hostResponse.Mark(iter.err)
  54. if iter.err == nil {
  55. iter.host = host
  56. return iter, nil
  57. }
  58. if rt.GetRetryType(iter.err) != Retry {
  59. break
  60. }
  61. }
  62. case Rethrow:
  63. return nil, iter.err
  64. case Ignore:
  65. return iter, nil
  66. case RetryNextHost:
  67. default:
  68. }
  69. // Exit for loop if the query was successful
  70. if iter.err == nil {
  71. iter.host = host
  72. return iter, nil
  73. }
  74. if !rt.Attempt(qry) {
  75. // What do here? Should we just return an error here?
  76. break
  77. }
  78. }
  79. if iter == nil {
  80. return nil, ErrNoConnections
  81. }
  82. return iter, nil
  83. }