query_executor.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(time.Duration)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. RetryableQuery
  11. }
  12. type queryExecutor struct {
  13. pool *policyConnPool
  14. policy HostSelectionPolicy
  15. }
  16. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  17. start := time.Now()
  18. iter := qry.execute(conn)
  19. qry.attempt(time.Since(start))
  20. return iter
  21. }
  22. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  23. rt := qry.retryPolicy()
  24. hostIter := q.policy.Pick(qry)
  25. var iter *Iter
  26. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  27. host := hostResponse.Info()
  28. if host == nil || !host.IsUp() {
  29. continue
  30. }
  31. pool, ok := q.pool.getPool(host)
  32. if !ok {
  33. continue
  34. }
  35. conn := pool.Pick()
  36. if conn == nil {
  37. continue
  38. }
  39. iter = q.attemptQuery(qry, conn)
  40. // Update host
  41. hostResponse.Mark(iter.err)
  42. // Exit for loop if the query was successful
  43. if iter.err == nil {
  44. iter.host = host
  45. return iter, nil
  46. }
  47. if rt == nil || !rt.Attempt(qry) {
  48. // What do here? Should we just return an error here?
  49. break
  50. }
  51. }
  52. if iter == nil {
  53. return nil, ErrNoConnections
  54. }
  55. return iter, nil
  56. }