| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package gocql
- import (
- "context"
- "time"
- )
- type ExecutableQuery interface {
- execute(ctx context.Context, conn *Conn) *Iter
- attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
- retryPolicy() RetryPolicy
- speculativeExecutionPolicy() SpeculativeExecutionPolicy
- GetRoutingKey() ([]byte, error)
- Keyspace() string
- IsIdempotent() bool
- withContext(context.Context) ExecutableQuery
- RetryableQuery
- }
- type queryExecutor struct {
- pool *policyConnPool
- policy HostSelectionPolicy
- }
- func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
- start := time.Now()
- iter := qry.execute(ctx, conn)
- end := time.Now()
- qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
- return iter
- }
- func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter {
- ticker := time.NewTicker(sp.Delay())
- defer ticker.Stop()
- for i := 0; i < sp.Attempts(); i++ {
- select {
- case <-ticker.C:
- go q.run(ctx, qry, results)
- case <-ctx.Done():
- return &Iter{err: ctx.Err()}
- case iter := <-results:
- return iter
- }
- }
- return nil
- }
- func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
- // check if the query is not marked as idempotent, if
- // it is, we force the policy to NonSpeculative
- sp := qry.speculativeExecutionPolicy()
- if !qry.IsIdempotent() || sp.Attempts() == 0 {
- return q.do(qry.Context(), qry), nil
- }
- ctx, cancel := context.WithCancel(qry.Context())
- defer cancel()
- results := make(chan *Iter, 1)
- // Launch the main execution
- go q.run(ctx, qry, results)
- // The speculative executions are launched _in addition_ to the main
- // execution, on a timer. So Speculation{2} would make 3 executions running
- // in total.
- if iter := q.speculate(ctx, qry, sp, results); iter != nil {
- return iter, nil
- }
- select {
- case iter := <-results:
- return iter, nil
- case <-ctx.Done():
- return &Iter{err: ctx.Err()}, nil
- }
- }
- func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
- hostIter := q.policy.Pick(qry)
- selectedHost := hostIter()
- rt := qry.retryPolicy()
- var lastErr error
- var iter *Iter
- for selectedHost != nil {
- host := selectedHost.Info()
- if host == nil || !host.IsUp() {
- selectedHost = hostIter()
- continue
- }
- pool, ok := q.pool.getPool(host)
- if !ok {
- selectedHost = hostIter()
- continue
- }
- conn := pool.Pick()
- if conn == nil {
- selectedHost = hostIter()
- continue
- }
- iter = q.attemptQuery(ctx, qry, conn)
- iter.host = selectedHost.Info()
- // Update host
- switch iter.err {
- case context.Canceled, context.DeadlineExceeded, ErrNotFound:
- // those errors represents logical errors, they should not count
- // toward removing a node from the pool
- selectedHost.Mark(nil)
- return iter
- default:
- selectedHost.Mark(iter.err)
- }
- // Exit if the query was successful
- // or no retry policy defined or retry attempts were reached
- if iter.err == nil || rt == nil || !rt.Attempt(qry) {
- return iter
- }
- lastErr = iter.err
- // If query is unsuccessful, check the error with RetryPolicy to retry
- switch rt.GetRetryType(iter.err) {
- case Retry:
- // retry on the same host
- continue
- case Rethrow, Ignore:
- return iter
- case RetryNextHost:
- // retry on the next host
- selectedHost = hostIter()
- continue
- default:
- // Undefined? Return nil and error, this will panic in the requester
- return &Iter{err: ErrUnknownRetryType}
- }
- }
- if lastErr != nil {
- return &Iter{err: lastErr}
- }
- return &Iter{err: ErrNoConnections}
- }
- func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) {
- select {
- case results <- q.do(ctx, qry):
- case <-ctx.Done():
- }
- }
|