query_executor.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package gocql
  2. import (
  3. "context"
  4. "time"
  5. )
  6. type ExecutableQuery interface {
  7. execute(ctx context.Context, conn *Conn) *Iter
  8. attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
  9. retryPolicy() RetryPolicy
  10. speculativeExecutionPolicy() SpeculativeExecutionPolicy
  11. GetRoutingKey() ([]byte, error)
  12. Keyspace() string
  13. IsIdempotent() bool
  14. withContext(context.Context) ExecutableQuery
  15. RetryableQuery
  16. }
  17. type queryExecutor struct {
  18. pool *policyConnPool
  19. policy HostSelectionPolicy
  20. }
  21. func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
  22. start := time.Now()
  23. iter := qry.execute(ctx, conn)
  24. end := time.Now()
  25. qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
  26. return iter
  27. }
  28. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  29. // check if the query is not marked as idempotent, if
  30. // it is, we force the policy to NonSpeculative
  31. sp := qry.speculativeExecutionPolicy()
  32. if !qry.IsIdempotent() {
  33. sp = NonSpeculativeExecution{}
  34. }
  35. ctx, cancel := context.WithCancel(qry.Context())
  36. defer cancel()
  37. results := make(chan *Iter, 1)
  38. // Launch the main execution
  39. go q.run(ctx, qry, results)
  40. // The speculative executions are launched _in addition_ to the main
  41. // execution, on a timer. So Speculation{2} would make 3 executions running
  42. // in total.
  43. go func() {
  44. // setup a ticker
  45. ticker := time.NewTicker(sp.Delay())
  46. defer ticker.Stop()
  47. for i := 0; i < sp.Attempts(); i++ {
  48. select {
  49. case <-ticker.C:
  50. // Launch the additional execution
  51. go q.run(ctx, qry, results)
  52. case <-ctx.Done():
  53. // not starting additional executions
  54. return
  55. }
  56. }
  57. }()
  58. select {
  59. case iter := <-results:
  60. return iter, nil
  61. case <-ctx.Done():
  62. return &Iter{err: ctx.Err()}, nil
  63. }
  64. }
  65. func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
  66. hostIter := q.policy.Pick(qry)
  67. selectedHost := hostIter()
  68. rt := qry.retryPolicy()
  69. var lastErr error
  70. var iter *Iter
  71. for selectedHost != nil {
  72. host := selectedHost.Info()
  73. if host == nil || !host.IsUp() {
  74. selectedHost = hostIter()
  75. continue
  76. }
  77. pool, ok := q.pool.getPool(host)
  78. if !ok {
  79. selectedHost = hostIter()
  80. continue
  81. }
  82. conn := pool.Pick()
  83. if conn == nil {
  84. selectedHost = hostIter()
  85. continue
  86. }
  87. iter = q.attemptQuery(ctx, qry, conn)
  88. iter.host = selectedHost.Info()
  89. // Update host
  90. selectedHost.Mark(iter.err)
  91. // Exit if the query was successful
  92. // or no retry policy defined or retry attempts were reached
  93. if iter.err == nil || rt == nil || !rt.Attempt(qry) {
  94. return iter
  95. }
  96. lastErr = iter.err
  97. // If query is unsuccessful, check the error with RetryPolicy to retry
  98. switch rt.GetRetryType(iter.err) {
  99. case Retry:
  100. // retry on the same host
  101. continue
  102. case Rethrow, Ignore:
  103. return iter
  104. case RetryNextHost:
  105. // retry on the next host
  106. selectedHost = hostIter()
  107. continue
  108. default:
  109. // Undefined? Return nil and error, this will panic in the requester
  110. return &Iter{err: ErrUnknownRetryType}
  111. }
  112. }
  113. if lastErr != nil {
  114. return &Iter{err: lastErr}
  115. }
  116. return &Iter{err: ErrNoConnections}
  117. }
  118. func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan *Iter) {
  119. select {
  120. case results <- q.do(ctx, qry):
  121. case <-ctx.Done():
  122. }
  123. }