query_executor.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(keyspace string, end, start time.Time, iter *Iter, address string)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. Keyspace() string
  11. RetryableQuery
  12. }
  13. type queryExecutor struct {
  14. pool *policyConnPool
  15. policy HostSelectionPolicy
  16. }
  17. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  18. start := time.Now()
  19. iter := qry.execute(conn)
  20. end := time.Now()
  21. qry.attempt(q.pool.keyspace, end, start, iter, conn.Address())
  22. return iter
  23. }
  24. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  25. rt := qry.retryPolicy()
  26. hostIter := q.policy.Pick(qry)
  27. var iter *Iter
  28. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  29. host := hostResponse.Info()
  30. if host == nil || !host.IsUp() {
  31. continue
  32. }
  33. pool, ok := q.pool.getPool(host)
  34. if !ok {
  35. continue
  36. }
  37. conn := pool.Pick()
  38. if conn == nil {
  39. continue
  40. }
  41. iter = q.attemptQuery(qry, conn)
  42. // Update host
  43. hostResponse.Mark(iter.err)
  44. if rt == nil {
  45. iter.host = host
  46. break
  47. }
  48. switch rt.GetRetryType(iter.err) {
  49. case Retry:
  50. for rt.Attempt(qry) {
  51. iter = q.attemptQuery(qry, conn)
  52. hostResponse.Mark(iter.err)
  53. if iter.err == nil {
  54. iter.host = host
  55. return iter, nil
  56. }
  57. if rt.GetRetryType(iter.err) != Retry {
  58. break
  59. }
  60. }
  61. case Rethrow:
  62. return nil, iter.err
  63. case Ignore:
  64. return iter, nil
  65. case RetryNextHost:
  66. default:
  67. }
  68. // Exit for loop if the query was successful
  69. if iter.err == nil {
  70. iter.host = host
  71. return iter, nil
  72. }
  73. if !rt.Attempt(qry) {
  74. // What do here? Should we just return an error here?
  75. break
  76. }
  77. }
  78. if iter == nil {
  79. return nil, ErrNoConnections
  80. }
  81. return iter, nil
  82. }