Kaynağa Gözat

DowngradingConsistencyPolicy and enhancements to retry policy behaviour (#1082)

* initial policy setup

* stub for different query retry types and DowngradeConsistencyPolicy

* add implementation for the policy and straighten up the query executor logic for added retry types

* fix pointers

* remove debug messages and added returned error types

* rollback unecessary change

* add unit tests for new policy and returned retry types

* change after running gofmt

* fix print

* fix comment format, fix badly written switch case, remove redundant switch case
Chang Liu 7 yıl önce
ebeveyn
işleme
3540fc649c
4 değiştirilmiş dosya ile 179 ekleme ve 3 silme
  1. 79 0
      policies.go
  2. 59 0
      policies_test.go
  3. 26 2
      query_executor.go
  4. 15 1
      session.go

+ 79 - 0
policies.go

@@ -128,9 +128,19 @@ func (c *cowHostList) remove(ip net.IP) bool {
 // exposes the correct functions for the retry policy logic to evaluate correctly.
 type RetryableQuery interface {
 	Attempts() int
+	SetConsistency(c Consistency)
 	GetConsistency() Consistency
 }
 
+type RetryType uint16
+
+const (
+	Retry         RetryType = 0x00 // retry on same connection
+	RetryNextHost RetryType = 0x01 // retry on another connection
+	Ignore        RetryType = 0x02 // ignore error and return result
+	Rethrow       RetryType = 0x03 // raise error and stop retrying
+)
+
 // RetryPolicy interface is used by gocql to determine if a query can be attempted
 // again after a retryable error has been received. The interface allows gocql
 // users to implement their own logic to determine if a query can be attempted
@@ -140,6 +150,7 @@ type RetryableQuery interface {
 // interface.
 type RetryPolicy interface {
 	Attempt(RetryableQuery) bool
+	GetRetryType(error) RetryType
 }
 
 // SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
@@ -162,6 +173,10 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
 	return q.Attempts() <= s.NumRetries
 }
 
+func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
+	return RetryNextHost
+}
+
 // ExponentialBackoffRetryPolicy sleeps between attempts
 type ExponentialBackoffRetryPolicy struct {
 	NumRetries int
@@ -194,6 +209,70 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time
 	return time.Duration(napDuration)
 }
 
+func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
+	return RetryNextHost
+}
+
+// DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level
+// provided in the slice
+//
+// On a read timeout: the operation is retried with the next provided consistency
+// level.
+//
+// On a write timeout: if the operation is an :attr:`~.UNLOGGED_BATCH`
+// and at least one replica acknowledged the write, the operation is
+// retried with the next consistency level.  Furthermore, for other
+// write types, if at least one replica acknowledged the write, the
+// timeout is ignored.
+//
+// On an unavailable exception: if at least one replica is alive, the
+// operation is retried with the next provided consistency level.
+
+type DowngradingConsistencyRetryPolicy struct {
+	ConsistencyLevelsToTry []Consistency
+}
+
+func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool {
+	currentAttempt := q.Attempts()
+
+	if currentAttempt > len(d.ConsistencyLevelsToTry) {
+		return false
+	} else if currentAttempt > 0 {
+		q.SetConsistency(d.ConsistencyLevelsToTry[currentAttempt-1])
+		if gocqlDebug {
+			Logger.Printf("%T: set consistency to %q\n",
+				d,
+				d.ConsistencyLevelsToTry[currentAttempt-1])
+		}
+	}
+	return true
+}
+
+func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType {
+	switch t := err.(type) {
+	case *RequestErrUnavailable:
+		if t.Alive > 0 {
+			return Retry
+		}
+		return Rethrow
+	case *RequestErrWriteTimeout:
+		if t.WriteType == "SIMPLE" || t.WriteType == "BATCH" || t.WriteType == "COUNTER" {
+			if t.Received > 0 {
+				return Ignore
+			}
+			return Rethrow
+		}
+		if t.WriteType == "UNLOGGED_BATCH" {
+			return Retry
+		}
+		return Rethrow
+	case *RequestErrReadTimeout:
+		return Retry
+	default:
+		return RetryNextHost
+	}
+}
+
 func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
 	return getExponentialTime(e.Min, e.Max, attempts)
 }

+ 59 - 0
policies_test.go

@@ -302,6 +302,65 @@ func TestExponentialBackoffPolicy(t *testing.T) {
 	}
 }
 
+func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
+
+	q := &Query{cons: LocalQuorum}
+
+	rewt0 := &RequestErrWriteTimeout{
+		Received:  0,
+		WriteType: "SIMPLE",
+	}
+
+	rewt1 := &RequestErrWriteTimeout{
+		Received:  1,
+		WriteType: "BATCH",
+	}
+
+	rewt2 := &RequestErrWriteTimeout{
+		WriteType: "UNLOGGED_BATCH",
+	}
+
+	rert := &RequestErrReadTimeout{}
+
+	reu0 := &RequestErrUnavailable{
+		Alive: 0,
+	}
+
+	reu1 := &RequestErrUnavailable{
+		Alive: 1,
+	}
+
+	// this should allow a total of 3 tries.
+	consistencyLevels := []Consistency{Three, Two, One}
+	rt := &DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: consistencyLevels}
+	cases := []struct {
+		attempts  int
+		allow     bool
+		err       error
+		retryType RetryType
+	}{
+		{0, true, rewt0, Rethrow},
+		{3, true, rewt1, Ignore},
+		{1, true, rewt2, Retry},
+		{2, true, rert, Retry},
+		{4, false, reu0, Rethrow},
+		{16, false, reu1, Retry},
+	}
+
+	for _, c := range cases {
+		q.attempts = c.attempts
+		if c.retryType != rt.GetRetryType(c.err) {
+			t.Fatalf("retry type should be %v", c.retryType)
+		}
+		if c.allow && !rt.Attempt(q) {
+			t.Fatalf("should allow retry after %d attempts", c.attempts)
+		}
+		if !c.allow && rt.Attempt(q) {
+			t.Fatalf("should not allow retry after %d attempts", c.attempts)
+		}
+	}
+}
+
 func TestHostPolicy_DCAwareRR(t *testing.T) {
 	p := DCAwareRoundRobinPolicy("local")
 

+ 26 - 2
query_executor.go

@@ -50,17 +50,41 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
 		}
 
 		iter = q.attemptQuery(qry, conn)
-
 		// Update host
 		hostResponse.Mark(iter.err)
 
+		if rt == nil {
+			break
+		}
+
+		switch rt.GetRetryType(iter.err) {
+		case Retry:
+			for rt.Attempt(qry) {
+				iter = q.attemptQuery(qry, conn)
+				hostResponse.Mark(iter.err)
+				if iter.err == nil {
+					iter.host = host
+					return iter, nil
+				}
+				if rt.GetRetryType(iter.err) != Retry {
+					break
+				}
+			}
+		case Rethrow:
+			return nil, iter.err
+		case Ignore:
+			return iter, nil
+		case RetryNextHost:
+		default:
+		}
+
 		// Exit for loop if the query was successful
 		if iter.err == nil {
 			iter.host = host
 			return iter, nil
 		}
 
-		if rt == nil || !rt.Attempt(qry) {
+		if !rt.Attempt(qry) {
 			// What do here? Should we just return an error here?
 			break
 		}

+ 15 - 1
session.go

@@ -709,6 +709,11 @@ func (q *Query) GetConsistency() Consistency {
 	return q.cons
 }
 
+// Same as Consistency but without a return value
+func (q *Query) SetConsistency(c Consistency) {
+	q.cons = c
+}
+
 // Trace enables tracing of this query. Look at the documentation of the
 // Tracer interface to learn more about tracing.
 func (q *Query) Trace(trace Tracer) *Query {
@@ -774,6 +779,9 @@ func (q *Query) execute(conn *Conn) *Iter {
 }
 
 func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter) {
+	if gocqlDebug {
+		Logger.Printf("Attempting query: %d", q.attempts)
+	}
 	q.attempts++
 	q.totalLatency += end.Sub(start).Nanoseconds()
 	// TODO: track latencies per host and things as well instead of just total
@@ -1383,7 +1391,7 @@ func (s *Session) NewBatch(typ BatchType) *Batch {
 		Type:             typ,
 		rt:               s.cfg.RetryPolicy,
 		serialCons:       s.cfg.SerialConsistency,
-		observer: s.batchObserver,
+		observer:         s.batchObserver,
 		Cons:             s.cons,
 		defaultTimestamp: s.cfg.DefaultTimestamp,
 		keyspace:         s.cfg.Keyspace,
@@ -1422,6 +1430,12 @@ func (b *Batch) GetConsistency() Consistency {
 	return b.Cons
 }
 
+// SetConsistency sets the currently configured consistency level for the batch
+// operation.
+func (b *Batch) SetConsistency(c Consistency) {
+	b.Cons = c
+}
+
 // Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})