Kaynağa Gözat

Merge pull request #246 from phillipCouto/abstract_retries

Abstracted the Retry Policy logic to allow for custom policies.
Ben Hood 11 yıl önce
ebeveyn
işleme
fa7b7a6339
4 değiştirilmiş dosya ile 41 ekleme ve 7 silme
  1. 6 2
      cassandra_test.go
  2. 2 1
      conn_test.go
  3. 23 2
      policies.go
  4. 10 2
      session.go

+ 6 - 2
cassandra_test.go

@@ -58,7 +58,9 @@ func createCluster() *ClusterConfig {
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = 5 * time.Second
 	cluster.Consistency = Quorum
-	cluster.RetryPolicy.NumRetries = *flagRetry
+	if *flagRetry > 0 {
+		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
+	}
 
 	return cluster
 }
@@ -108,7 +110,9 @@ func TestRingDiscovery(t *testing.T) {
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = 5 * time.Second
 	cluster.Consistency = Quorum
-	cluster.RetryPolicy.NumRetries = *flagRetry
+	if *flagRetry > 0 {
+		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
+	}
 	cluster.DiscoverHosts = true
 
 	session, err := cluster.CreateSession()

+ 2 - 1
conn_test.go

@@ -85,7 +85,8 @@ func TestQueryRetry(t *testing.T) {
 		<-time.After(5 * time.Second)
 		t.Fatal("no timeout")
 	}()
-	rt := RetryPolicy{NumRetries: 1}
+	rt := &SimpleRetryPolicy{NumRetries: 1}
+
 	qry := db.Query("kill").RetryPolicy(rt)
 	if err := qry.Exec(); err == nil {
 		t.Fatal("expected error")

+ 23 - 2
policies.go

@@ -4,7 +4,28 @@
 //This file will be the future home for more policies
 package gocql
 
-// RetryPolicy represents the retry behavour for a query.
-type RetryPolicy struct {
+//RetryableQuery is an interface that represents a query or batch statement that
+//exposes the correct functions for the retry policy logic to evaluate correctly.
+type RetryableQuery interface {
+	Attempts() int
+}
+
+// RetryPolicy interace is used by gocql to determine if a query can be attempted
+// again after a retryable error has been received. The interface allows gocql
+// users to implement their own logic to determine if a query can be attempted
+// again.
+// See SimpleRetryPolicy as an example of implementing the RetryPolicy interface.
+type RetryPolicy interface {
+	Attempt(RetryableQuery) bool
+}
+
+// SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
+type SimpleRetryPolicy struct {
 	NumRetries int //Number of times to retry a query
 }
+
+// Attempt tells gocql to attempt the query again based on query.Attempts being less
+// than the NumRetries defined in the policy.
+func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
+	return q.Attempts() <= s.NumRetries
+}

+ 10 - 2
session.go

@@ -136,7 +136,7 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	var iter *Iter
 	qry.attempts = 0
 	qry.totalLatency = 0
-	for qry.attempts <= qry.rt.NumRetries {
+	for {
 		conn := s.Pool.Pick(qry)
 
 		//Assign the error unavailable to the iterator
@@ -154,6 +154,10 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 		if iter.err == nil {
 			break
 		}
+
+		if qry.rt == nil || !qry.rt.Attempt(qry) {
+			break
+		}
 	}
 
 	return iter
@@ -177,7 +181,7 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 	var err error
 	batch.attempts = 0
 	batch.totalLatency = 0
-	for batch.attempts <= batch.rt.NumRetries {
+	for {
 		conn := s.Pool.Pick(nil)
 
 		//Assign the error unavailable and break loop
@@ -193,6 +197,10 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 		if err == nil {
 			return nil
 		}
+
+		if batch.rt == nil || !batch.rt.Attempt(batch) {
+			break
+		}
 	}
 
 	return err