Kaynağa Gözat

Provide a way to limit per-query-attempt deadlines (#1151)

* Limit query fetch timeout

* Simplify query executor

* Bail out of Conn.exec if context was canceled/expired

This happens just before anything is put onto the wire.

* Add GetContext method to the RetryableQuery interface

This enables both the query executor as well as the retry policy to
access the context.

* Always set the host on the iter and exit early if query was successful

There's really no need to evaluate the retry policy if the query
succeeded.

* Rework query executor to respect per attempt timeouts

This adds a private wrapper around the optional retry policy and
handles the context that potentially was an attempt timeout.

* Collapse context check in retry policy wrapper, rework some comments

* Add docs to exported methods

* Unexport internal method in retry policy wrapper

* Add ourselves to the AUTHORS file

* Replace retryPolicyWrapper struct with checkRetryPolicy func

* Cleanup query attempt test

* Move AttemptTimeout to the retry policy via an optional interface

A retry policy may now implement an `AttemptTimeout` method which is
used to control per query attempt timeouts. It's part of an optional
interface in order for this feature to not break backwards compatibility.

* Unexport `numRetries` field in test retry policy

It's really not necessary for this to be exported.

* Add assertion to query attempt unit test to guard against regressions

This makes sure a number of query attempts are executed.

* Rename `GetContext()` to `Context()` in RetryableQuery interface

This is more in line with Go style.

* Embed `RetryPolicy` interface in extension interface

This doesn't change anything code-wise and is more source documentation
than anything else but I like the explicitness.

* Rework attempt timeout handling unit test

It now uses the existing slow query test query. I introduced a
dedicated atomic counter for the number of queries done in the
test server (the other counter `nreq` also includes other ops
besides queries, making assertions for query retries brittle
and opaque).

* Rework attempt timeout implementation

This replaces the attempt timeout via `context.WithDeadline` with a
cancelable context and a recycled timer on the query akin to the
strategy used in #661. The unit test fix is actually a side effect
of the overhead saved by recycling the timer. Previously each attempt
took ~27ms (on a 25ms attempt timeout) because of the overhead of
`context.WithDeadline`. It's now more like 25.2ms which occasionally
resulted in a fifth query attempt being made because the 100ms query
context deadline timer fired a millisecond later.
Daniel Lohse 7 yıl önce
ebeveyn
işleme
e48272ffe6
6 değiştirilmiş dosya ile 171 ekleme ve 58 silme
  1. 2 0
      AUTHORS
  2. 41 2
      conn.go
  3. 53 11
      conn_test.go
  4. 11 0
      policies.go
  5. 53 45
      query_executor.go
  6. 11 0
      session.go

+ 2 - 0
AUTHORS

@@ -106,3 +106,5 @@ Chang Liu <changliu.it@gmail.com>
 Ingo Oeser <nightlyone@gmail.com>
 Luke Hines <lukehines@protonmail.com>
 Jacob Greenleaf <jacob@jacobgreenleaf.com>
+Andreas Jaekle <andreas@jaekle.net>
+Daniel Lohse <daniel.lohse@alfatraining.de>

+ 41 - 2
conn.go

@@ -585,6 +585,10 @@ type callReq struct {
 }
 
 func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*framer, error) {
+	if ctx != nil && ctx.Err() != nil {
+		return nil, ctx.Err()
+	}
+
 	// TODO: move tracer onto conn
 	stream, ok := c.streams.GetStream()
 	if !ok {
@@ -790,6 +794,41 @@ func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error
 }
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
+	ctx := qry.context
+	if rt, ok := qry.rt.(RetryPolicyWithAttemptTimeout); ok && rt.AttemptTimeout() > 0 {
+		if ctx == nil {
+			ctx = context.Background()
+		}
+		var cancel func()
+		ctx, cancel = context.WithCancel(ctx)
+		defer cancel()
+		if qry.attemptTimeoutTimer == nil {
+			qry.attemptTimeoutTimer = time.NewTimer(0)
+			<-qry.attemptTimeoutTimer.C
+		} else {
+			if !qry.attemptTimeoutTimer.Stop() {
+				select {
+				case <-qry.attemptTimeoutTimer.C:
+				default:
+				}
+			}
+		}
+
+		qry.attemptTimeoutTimer.Reset(rt.AttemptTimeout())
+		timeoutCh := qry.attemptTimeoutTimer.C
+
+		go func() {
+			select {
+			case <-ctx.Done():
+				qry.attemptTimeoutTimer.Stop()
+				break
+			case <-timeoutCh:
+				break
+			}
+			cancel()
+		}()
+	}
+
 	params := queryParams{
 		consistency: qry.cons,
 	}
@@ -814,7 +853,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 	if qry.shouldPrepare() {
 		// Prepare all DML queries. Other queries can not be prepared.
 		var err error
-		info, err = c.prepareStatement(qry.context, qry.stmt, qry.trace)
+		info, err = c.prepareStatement(ctx, qry.stmt, qry.trace)
 		if err != nil {
 			return &Iter{err: err}
 		}
@@ -863,7 +902,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 	}
 
-	framer, err := c.exec(qry.context, frame, qry.trace)
+	framer, err := c.exec(ctx, frame, qry.trace)
 	if err != nil {
 		return &Iter{err: err}
 	}

+ 53 - 11
conn_test.go

@@ -15,6 +15,7 @@ import (
 	"io"
 	"io/ioutil"
 	"net"
+	"os"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -277,12 +278,46 @@ func TestTimeout(t *testing.T) {
 	wg.Wait()
 }
 
+type testRetryPolicy struct {
+	numRetries     int // maximum number of times to retry a query
+	attemptTimeout time.Duration
+	t              *testing.T
+}
+
+// Attempt tells gocql to attempt the query again based on query.Attempts being less
+// than the NumRetries defined in the policy.
+func (s *testRetryPolicy) Attempt(q RetryableQuery) bool {
+	return q.Attempts() <= s.numRetries
+}
+
+func (s *testRetryPolicy) GetRetryType(err error) RetryType {
+	return Retry
+}
+
+// AttemptTimeout satisfies the optional RetryPolicyWithAttemptTimeout interface.
+func (s *testRetryPolicy) AttemptTimeout() time.Duration {
+	return s.attemptTimeout
+}
+
+type testQueryObserver struct{}
+
+func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
+	Logger.Printf("Observed query %q. Returned %v rows, took %v on host %q. Error: %q\n", q.Statement, q.Rows, q.End.Sub(q.Start), q.Host.ConnectAddress().String(), q.Err)
+}
+
 // TestQueryRetry will test to make sure that gocql will execute
 // the exact amount of retry queries designated by the user.
 func TestQueryRetry(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
+	log := &testLogger{}
+	Logger = log
+	defer func() {
+		Logger = &defaultLogger{}
+		os.Stdout.WriteString(log.String())
+	}()
+
 	srv := NewTestServer(t, defaultProto, ctx)
 	defer srv.Stop()
 
@@ -301,22 +336,27 @@ func TestQueryRetry(t *testing.T) {
 		}
 	}()
 
-	rt := &SimpleRetryPolicy{NumRetries: 1}
-
-	qry := db.Query("kill").RetryPolicy(rt)
+	rt := &testRetryPolicy{numRetries: 10, t: t, attemptTimeout: time.Millisecond * 25}
+	queryCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*90)
+	defer cancel()
+	qry := db.Query("slow").RetryPolicy(rt).Observer(&testQueryObserver{}).WithContext(queryCtx)
 	if err := qry.Exec(); err == nil {
 		t.Fatalf("expected error")
 	}
 
-	requests := atomic.LoadInt64(&srv.nKillReq)
-	attempts := qry.Attempts()
-	if requests != int64(attempts) {
-		t.Fatalf("expected requests %v to match query attempts %v", requests, attempts)
-	}
+	// wait for the last slow query to finish
+	// this prevents the test from flaking because of writing to a connection that's been closed
+	time.Sleep(100 * time.Millisecond)
 
-	// the query will only be attempted once, but is being retried
-	if requests != int64(rt.NumRetries) {
-		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, requests-1)
+	numQueries := atomic.LoadUint64(&srv.nQueries)
+
+	// the 90ms timeout allows at most 4 retries
+	if numQueries > 4 {
+		t.Fatalf("Too many retries executed for query. Query executed %v times", numQueries)
+	}
+	// make sure query is retried to guard against regressions
+	if numQueries < 2 {
+		t.Fatalf("Not enough retries executed for query. Query executed %v times", numQueries)
 	}
 }
 
@@ -770,6 +810,7 @@ type TestServer struct {
 	nreq             uint64
 	listen           net.Listener
 	nKillReq         int64
+	nQueries         uint64
 	compressor       Compressor
 
 	protocol   byte
@@ -885,6 +926,7 @@ func (srv *TestServer) process(f *framer) {
 		f.writeHeader(0, opSupported, head.stream)
 		f.writeShort(0)
 	case opQuery:
+		atomic.AddUint64(&srv.nQueries, 1)
 		query := f.readLongString()
 		first := query
 		if n := strings.Index(query, " "); n > 0 {

+ 11 - 0
policies.go

@@ -5,6 +5,7 @@
 package gocql
 
 import (
+	"context"
 	"fmt"
 	"math"
 	"math/rand"
@@ -130,6 +131,7 @@ type RetryableQuery interface {
 	Attempts() int
 	SetConsistency(c Consistency)
 	GetConsistency() Consistency
+	Context() context.Context
 }
 
 type RetryType uint16
@@ -153,6 +155,15 @@ type RetryPolicy interface {
 	GetRetryType(error) RetryType
 }
 
+// RetryPolicyWithAttemptTimeout is an optional interface retry policies can implement
+// in order to control the duration to use before a query attempt is considered
+// as a timeout and will potentially be retried.
+// It's not part of the RetryPolicy interface to remain backwards compatible.
+type RetryPolicyWithAttemptTimeout interface {
+	AttemptTimeout() time.Duration
+	RetryPolicy
+}
+
 // SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
 //
 // See below for examples of usage:

+ 53 - 45
query_executor.go

@@ -1,9 +1,14 @@
 package gocql
 
 import (
+	"errors"
 	"time"
 )
 
+// ErrUnknownRetryType is returned if the retry policy returns a retry type
+// unknown to the query executor.
+var ErrUnknownRetryType = errors.New("unknown retry type returned by retry policy")
+
 type ExecutableQuery interface {
 	execute(conn *Conn) *Iter
 	attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
@@ -28,72 +33,75 @@ func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
 	return iter
 }
 
+// checkRetryPolicy is used by the query executor to determine how a failed query should be handled.
+// It consults the query context and the query's retry policy.
+func (q *queryExecutor) checkRetryPolicy(rq ExecutableQuery, err error) (RetryType, error) {
+	if ctx := rq.Context(); ctx != nil {
+		if ctx.Err() != nil {
+			return Rethrow, ctx.Err()
+		}
+	}
+	p := rq.retryPolicy()
+	if p == nil {
+		return Rethrow, err
+	}
+	if p.Attempt(rq) {
+		return p.GetRetryType(err), nil
+	}
+	return p.GetRetryType(err), err
+}
+
 func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
-	rt := qry.retryPolicy()
 	hostIter := q.policy.Pick(qry)
-
 	var iter *Iter
+
+outer:
 	for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
 		host := hostResponse.Info()
 		if host == nil || !host.IsUp() {
 			continue
 		}
-
-		pool, ok := q.pool.getPool(host)
+		hostPool, ok := q.pool.getPool(host)
 		if !ok {
 			continue
 		}
 
-		conn := pool.Pick()
+		conn := hostPool.Pick()
 		if conn == nil {
 			continue
 		}
+	inner:
+		for {
+			iter = q.attemptQuery(qry, conn)
+			// Update host
+			hostResponse.Mark(iter.err)
 
-		iter = q.attemptQuery(qry, conn)
-		// Update host
-		hostResponse.Mark(iter.err)
-
-		if rt == nil {
+			// note host the query was issued against
 			iter.host = host
-			break
-		}
 
-		switch rt.GetRetryType(iter.err) {
-		case Retry:
-			for rt.Attempt(qry) {
-				iter = q.attemptQuery(qry, conn)
-				hostResponse.Mark(iter.err)
-				if iter.err == nil {
-					iter.host = host
-					return iter, nil
-				}
-				if rt.GetRetryType(iter.err) != Retry {
-					break
-				}
+			// exit if the query was successful
+			if iter.err == nil {
+				return iter, nil
 			}
-		case Rethrow:
-			return nil, iter.err
-		case Ignore:
-			return iter, nil
-		case RetryNextHost:
-		default:
-		}
 
-		// Exit for loop if the query was successful
-		if iter.err == nil {
-			iter.host = host
-			return iter, nil
-		}
-
-		if !rt.Attempt(qry) {
-			// What do here? Should we just return an error here?
-			break
+			// consult retry policy on how to proceed
+			var retryType RetryType
+			retryType, iter.err = q.checkRetryPolicy(qry, iter.err)
+			switch retryType {
+			case Retry:
+				continue inner
+			case Rethrow:
+				return nil, iter.err
+			case Ignore:
+				return iter, nil
+			case RetryNextHost:
+				continue outer
+			default:
+				return nil, ErrUnknownRetryType
+			}
 		}
 	}
 
-	if iter == nil {
-		return nil, ErrNoConnections
-	}
-
-	return iter, nil
+	// if we reach this point, there is no host in the pool
+	return nil, ErrNoConnections
 }

+ 11 - 0
session.go

@@ -681,6 +681,7 @@ type Query struct {
 	disableSkipMetadata   bool
 	context               context.Context
 	idempotent            bool
+	attemptTimeoutTimer   *time.Timer
 
 	disableAutoPage bool
 }
@@ -803,6 +804,11 @@ func (q *Query) WithContext(ctx context.Context) *Query {
 	return q
 }
 
+// Context satisfies the ExecutableQuery interface.
+func (q *Query) Context() context.Context {
+	return q.context
+}
+
 func (q *Query) execute(conn *Conn) *Iter {
 	return conn.executeQuery(q)
 }
@@ -1485,6 +1491,11 @@ func (b *Batch) WithContext(ctx context.Context) *Batch {
 	return b
 }
 
+// Context satisfies the ExecutableQuery interface.
+func (b *Batch) Context() context.Context {
+	return b.context
+}
+
 // Size returns the number of batch statements to be executed by the batch operation.
 func (b *Batch) Size() int {
 	return len(b.Entries)