query_executor.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package gocql
  2. import (
  3. "context"
  4. "time"
  5. )
  6. type ExecutableQuery interface {
  7. execute(ctx context.Context, conn *Conn) *Iter
  8. attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
  9. retryPolicy() RetryPolicy
  10. speculativeExecutionPolicy() SpeculativeExecutionPolicy
  11. GetRoutingKey() ([]byte, error)
  12. Keyspace() string
  13. IsIdempotent() bool
  14. withContext(context.Context) ExecutableQuery
  15. RetryableQuery
  16. }
  17. type queryExecutor struct {
  18. pool *policyConnPool
  19. policy HostSelectionPolicy
  20. }
  21. func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
  22. start := time.Now()
  23. iter := qry.execute(ctx, conn)
  24. end := time.Now()
  25. qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
  26. return iter
  27. }
  28. func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter {
  29. ticker := time.NewTicker(sp.Delay())
  30. defer ticker.Stop()
  31. for i := 0; i < sp.Attempts(); i++ {
  32. select {
  33. case <-ticker.C:
  34. go q.run(ctx, qry, results)
  35. case <-ctx.Done():
  36. return &Iter{err: ctx.Err()}
  37. case iter := <-results:
  38. return iter
  39. }
  40. }
  41. return nil
  42. }
  43. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  44. // check if the query is not marked as idempotent, if
  45. // it is, we force the policy to NonSpeculative
  46. sp := qry.speculativeExecutionPolicy()
  47. if !qry.IsIdempotent() || sp.Attempts() == 0 {
  48. return q.do(qry.Context(), qry), nil
  49. }
  50. ctx, cancel := context.WithCancel(qry.Context())
  51. defer cancel()
  52. results := make(chan *Iter, 1)
  53. // Launch the main execution
  54. go q.run(ctx, qry, results)
  55. // The speculative executions are launched _in addition_ to the main
  56. // execution, on a timer. So Speculation{2} would make 3 executions running
  57. // in total.
  58. if iter := q.speculate(ctx, qry, sp, results); iter != nil {
  59. return iter, nil
  60. }
  61. select {
  62. case iter := <-results:
  63. return iter, nil
  64. case <-ctx.Done():
  65. return &Iter{err: ctx.Err()}, nil
  66. }
  67. }
  68. func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
  69. hostIter := q.policy.Pick(qry)
  70. selectedHost := hostIter()
  71. rt := qry.retryPolicy()
  72. var lastErr error
  73. var iter *Iter
  74. for selectedHost != nil {
  75. host := selectedHost.Info()
  76. if host == nil || !host.IsUp() {
  77. selectedHost = hostIter()
  78. continue
  79. }
  80. pool, ok := q.pool.getPool(host)
  81. if !ok {
  82. selectedHost = hostIter()
  83. continue
  84. }
  85. conn := pool.Pick()
  86. if conn == nil {
  87. selectedHost = hostIter()
  88. continue
  89. }
  90. iter = q.attemptQuery(ctx, qry, conn)
  91. iter.host = selectedHost.Info()
  92. // Update host
  93. selectedHost.Mark(iter.err)
  94. // Exit if the query was successful
  95. // or no retry policy defined or retry attempts were reached
  96. if iter.err == nil || rt == nil || !rt.Attempt(qry) {
  97. return iter
  98. }
  99. lastErr = iter.err
  100. // If query is unsuccessful, check the error with RetryPolicy to retry
  101. switch rt.GetRetryType(iter.err) {
  102. case Retry:
  103. // retry on the same host
  104. continue
  105. case Rethrow, Ignore:
  106. return iter
  107. case RetryNextHost:
  108. // retry on the next host
  109. selectedHost = hostIter()
  110. continue
  111. default:
  112. // Undefined? Return nil and error, this will panic in the requester
  113. return &Iter{err: ErrUnknownRetryType}
  114. }
  115. }
  116. if lastErr != nil {
  117. return &Iter{err: lastErr}
  118. }
  119. return &Iter{err: ErrNoConnections}
  120. }
  121. func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) {
  122. select {
  123. case results <- q.do(ctx, qry):
  124. case <-ctx.Done():
  125. }
  126. }