query_executor.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(time.Duration)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. Keyspace() string
  11. RetryableQuery
  12. }
  13. type queryExecutor struct {
  14. pool *policyConnPool
  15. policy HostSelectionPolicy
  16. }
  17. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  18. start := time.Now()
  19. iter := qry.execute(conn)
  20. qry.attempt(time.Since(start))
  21. return iter
  22. }
  23. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  24. rt := qry.retryPolicy()
  25. hostIter := q.policy.Pick(qry)
  26. var iter *Iter
  27. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  28. host := hostResponse.Info()
  29. if host == nil || !host.IsUp() {
  30. continue
  31. }
  32. pool, ok := q.pool.getPool(host)
  33. if !ok {
  34. continue
  35. }
  36. conn := pool.Pick()
  37. if conn == nil {
  38. continue
  39. }
  40. iter = q.attemptQuery(qry, conn)
  41. // Update host
  42. hostResponse.Mark(iter.err)
  43. // Exit for loop if the query was successful
  44. if iter.err == nil {
  45. iter.host = host
  46. return iter, nil
  47. }
  48. if rt == nil || !rt.Attempt(qry) {
  49. // What do here? Should we just return an error here?
  50. break
  51. }
  52. }
  53. if iter == nil {
  54. return nil, ErrNoConnections
  55. }
  56. return iter, nil
  57. }