query_executor.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package gocql
  2. import (
  3. "context"
  4. "time"
  5. )
  6. type ExecutableQuery interface {
  7. execute(ctx context.Context, conn *Conn) *Iter
  8. attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
  9. retryPolicy() RetryPolicy
  10. speculativeExecutionPolicy() SpeculativeExecutionPolicy
  11. GetRoutingKey() ([]byte, error)
  12. Keyspace() string
  13. IsIdempotent() bool
  14. withContext(context.Context) ExecutableQuery
  15. RetryableQuery
  16. }
  17. type queryExecutor struct {
  18. pool *policyConnPool
  19. policy HostSelectionPolicy
  20. }
  21. func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
  22. start := time.Now()
  23. iter := qry.execute(ctx, conn)
  24. end := time.Now()
  25. qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
  26. return iter
  27. }
  28. func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter {
  29. ticker := time.NewTicker(sp.Delay())
  30. defer ticker.Stop()
  31. for i := 0; i < sp.Attempts(); i++ {
  32. select {
  33. case <-ticker.C:
  34. go q.run(ctx, qry, results)
  35. case <-ctx.Done():
  36. return &Iter{err: ctx.Err()}
  37. case iter := <-results:
  38. return iter
  39. }
  40. }
  41. return nil
  42. }
  43. func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
  44. // check if the query is not marked as idempotent, if
  45. // it is, we force the policy to NonSpeculative
  46. sp := qry.speculativeExecutionPolicy()
  47. if !qry.IsIdempotent() || sp.Attempts() == 0 {
  48. return q.do(qry.Context(), qry), nil
  49. }
  50. ctx, cancel := context.WithCancel(qry.Context())
  51. defer cancel()
  52. results := make(chan *Iter, 1)
  53. // Launch the main execution
  54. go q.run(ctx, qry, results)
  55. // The speculative executions are launched _in addition_ to the main
  56. // execution, on a timer. So Speculation{2} would make 3 executions running
  57. // in total.
  58. if iter := q.speculate(ctx, qry, sp, results); iter != nil {
  59. return iter, nil
  60. }
  61. select {
  62. case iter := <-results:
  63. return iter, nil
  64. case <-ctx.Done():
  65. return &Iter{err: ctx.Err()}, nil
  66. }
  67. }
  68. func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
  69. hostIter := q.policy.Pick(qry)
  70. selectedHost := hostIter()
  71. rt := qry.retryPolicy()
  72. var lastErr error
  73. var iter *Iter
  74. for selectedHost != nil {
  75. host := selectedHost.Info()
  76. if host == nil || !host.IsUp() {
  77. selectedHost = hostIter()
  78. continue
  79. }
  80. pool, ok := q.pool.getPool(host)
  81. if !ok {
  82. selectedHost = hostIter()
  83. continue
  84. }
  85. conn := pool.Pick()
  86. if conn == nil {
  87. selectedHost = hostIter()
  88. continue
  89. }
  90. iter = q.attemptQuery(ctx, qry, conn)
  91. iter.host = selectedHost.Info()
  92. // Update host
  93. switch iter.err {
  94. case context.Canceled, context.DeadlineExceeded, ErrNotFound:
  95. // those errors represents logical errors, they should not count
  96. // toward removing a node from the pool
  97. selectedHost.Mark(nil)
  98. return iter
  99. default:
  100. selectedHost.Mark(iter.err)
  101. }
  102. // Exit if the query was successful
  103. // or no retry policy defined or retry attempts were reached
  104. if iter.err == nil || rt == nil || !rt.Attempt(qry) {
  105. return iter
  106. }
  107. lastErr = iter.err
  108. // If query is unsuccessful, check the error with RetryPolicy to retry
  109. switch rt.GetRetryType(iter.err) {
  110. case Retry:
  111. // retry on the same host
  112. continue
  113. case Rethrow, Ignore:
  114. return iter
  115. case RetryNextHost:
  116. // retry on the next host
  117. selectedHost = hostIter()
  118. continue
  119. default:
  120. // Undefined? Return nil and error, this will panic in the requester
  121. return &Iter{err: ErrUnknownRetryType}
  122. }
  123. }
  124. if lastErr != nil {
  125. return &Iter{err: lastErr}
  126. }
  127. return &Iter{err: ErrNoConnections}
  128. }
  129. func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) {
  130. select {
  131. case results <- q.do(ctx, qry):
  132. case <-ctx.Done():
  133. }
  134. }