query_executor.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(keyspace string, end, start time.Time, iter *Iter)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. Keyspace() string
  11. RetryableQuery
  12. }
  13. type queryExecutor struct {
  14. pool *policyConnPool
  15. policy HostSelectionPolicy
  16. }
  17. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  18. start := time.Now()
  19. iter := qry.execute(conn)
  20. end := time.Now()
  21. qry.attempt(q.pool.keyspace, end, start, iter)
  22. return iter
  23. }
  24. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  25. rt := qry.retryPolicy()
  26. hostIter := q.policy.Pick(qry)
  27. var iter *Iter
  28. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  29. host := hostResponse.Info()
  30. if host == nil || !host.IsUp() {
  31. continue
  32. }
  33. pool, ok := q.pool.getPool(host)
  34. if !ok {
  35. continue
  36. }
  37. conn := pool.Pick()
  38. if conn == nil {
  39. continue
  40. }
  41. iter = q.attemptQuery(qry, conn)
  42. // Update host
  43. hostResponse.Mark(iter.err)
  44. if rt == nil {
  45. break
  46. }
  47. switch rt.GetRetryType(iter.err) {
  48. case Retry:
  49. for rt.Attempt(qry) {
  50. iter = q.attemptQuery(qry, conn)
  51. hostResponse.Mark(iter.err)
  52. if iter.err == nil {
  53. iter.host = host
  54. return iter, nil
  55. }
  56. if rt.GetRetryType(iter.err) != Retry {
  57. break
  58. }
  59. }
  60. case Rethrow:
  61. return nil, iter.err
  62. case Ignore:
  63. return iter, nil
  64. case RetryNextHost:
  65. default:
  66. }
  67. // Exit for loop if the query was successful
  68. if iter.err == nil {
  69. iter.host = host
  70. return iter, nil
  71. }
  72. if !rt.Attempt(qry) {
  73. // What do here? Should we just return an error here?
  74. break
  75. }
  76. }
  77. if iter == nil {
  78. return nil, ErrNoConnections
  79. }
  80. return iter, nil
  81. }