query_executor.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(keyspace string, end, start time.Time, iter *Iter)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. Keyspace() string
  11. RetryableQuery
  12. }
  13. type queryExecutor struct {
  14. pool *policyConnPool
  15. policy HostSelectionPolicy
  16. }
  17. func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
  18. start := time.Now()
  19. iter := qry.execute(conn)
  20. end := time.Now()
  21. qry.attempt(q.pool.keyspace, end, start, iter)
  22. return iter
  23. }
  24. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  25. rt := qry.retryPolicy()
  26. hostIter := q.policy.Pick(qry)
  27. var iter *Iter
  28. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  29. host := hostResponse.Info()
  30. if host == nil || !host.IsUp() {
  31. continue
  32. }
  33. pool, ok := q.pool.getPool(host)
  34. if !ok {
  35. continue
  36. }
  37. conn := pool.Pick()
  38. if conn == nil {
  39. continue
  40. }
  41. iter = q.attemptQuery(qry, conn)
  42. // Update host
  43. hostResponse.Mark(iter.err)
  44. // Exit for loop if the query was successful
  45. if iter.err == nil {
  46. iter.host = host
  47. return iter, nil
  48. }
  49. if rt == nil || !rt.Attempt(qry) {
  50. // What do here? Should we just return an error here?
  51. break
  52. }
  53. }
  54. if iter == nil {
  55. return nil, ErrNoConnections
  56. }
  57. return iter, nil
  58. }