query_executor.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package gocql
  2. import (
  3. "time"
  4. )
  5. type ExecutableQuery interface {
  6. execute(conn *Conn) *Iter
  7. attempt(time.Duration)
  8. retryPolicy() RetryPolicy
  9. GetRoutingKey() ([]byte, error)
  10. RetryableQuery
  11. }
  12. type queryExecutor struct {
  13. pool *policyConnPool
  14. policy HostSelectionPolicy
  15. }
  16. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  17. rt := qry.retryPolicy()
  18. hostIter := q.policy.Pick(qry)
  19. var iter *Iter
  20. for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
  21. host := hostResponse.Info()
  22. if !host.IsUp() {
  23. continue
  24. }
  25. pool, ok := q.pool.getPool(host.Peer())
  26. if !ok {
  27. continue
  28. }
  29. conn := pool.Pick()
  30. if conn == nil {
  31. continue
  32. }
  33. start := time.Now()
  34. iter = qry.execute(conn)
  35. qry.attempt(time.Since(start))
  36. // Update host
  37. hostResponse.Mark(iter.err)
  38. // Exit for loop if the query was successful
  39. if iter.err == nil {
  40. return iter, nil
  41. }
  42. if rt == nil || !rt.Attempt(qry) {
  43. // What do here? Should we just return an error here?
  44. break
  45. }
  46. }
  47. if iter == nil {
  48. return nil, ErrNoConnections
  49. }
  50. return iter, nil
  51. }