Browse Source

Merge pull request #112 from phillipCouto/retrypolicy

Added retry policies for Query and Batch Statements.
Phillip Couto 11 years ago
parent
commit
0e1ae5c0f6
5 changed files with 136 additions and 19 deletions
  1. 23 0
      cassandra_test.go
  2. 1 0
      cluster.go
  3. 32 4
      conn_test.go
  4. 10 0
      policies.go
  5. 70 15
      session.go

+ 23 - 0
cassandra_test.go

@@ -249,6 +249,29 @@ func TestBatch(t *testing.T) {
 	}
 }
 
+// TestBatchLimit tests gocql to make sure batch operations larger than the maximum
+// statement limit are not submitted to a cassandra node.
+func TestBatchLimit(t *testing.T) {
+	if *flagProto == 1 {
+		t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
+	}
+	session := createSession(t)
+	defer session.Close()
+
+	if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
+		t.Fatal("create table:", err)
+	}
+
+	batch := NewBatch(LoggedBatch)
+	for i := 0; i < 65537; i++ {
+		batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
+	}
+	if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
+		t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
+	}
+
+}
+
 type Page struct {
 	Title       string
 	RevId       UUID

+ 1 - 0
cluster.go

@@ -32,6 +32,7 @@ type ClusterConfig struct {
 	Consistency   Consistency   // default consistency level (default: Quorum)
 	Compressor    Compressor    // compression algorithm (default: nil)
 	Authenticator Authenticator // authenticator (default: nil)
+	RetryPolicy   RetryPolicy   // Default retry policy to use for queries(default:0)
 }
 
 // NewCluster generates a new config for the default cluster implementation.

+ 32 - 4
conn_test.go

@@ -15,10 +15,11 @@ import (
 )
 
 type TestServer struct {
-	Address string
-	t       *testing.T
-	nreq    uint64
-	listen  net.Listener
+	Address  string
+	t        *testing.T
+	nreq     uint64
+	listen   net.Listener
+	nKillReq uint64
 }
 
 func TestSimple(t *testing.T) {
@@ -69,6 +70,32 @@ func TestTimeout(t *testing.T) {
 	}
 }
 
+// TestQueryRetry will test to make sure that gocql will execute
+// the exact amount of retry queries designated by the user.
+func TestQueryRetry(t *testing.T) {
+	srv := NewTestServer(t)
+	defer srv.Stop()
+
+	db, err := NewCluster(srv.Address).CreateSession()
+	if err != nil {
+		t.Errorf("NewCluster: %v", err)
+	}
+
+	go func() {
+		<-time.After(5 * time.Second)
+		t.Fatal("no timeout")
+	}()
+	rt := RetryPolicy{NumRetries: 1}
+
+	if err := db.Query("kill").RetryPolicy(rt).Exec(); err == nil {
+		t.Fatal("expected error")
+	}
+	//Minus 1 from the nKillReq variable since there is the initial query attempt
+	if srv.nKillReq-1 != uint64(rt.NumRetries) {
+		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, srv.nKillReq-1)
+	}
+}
+
 func TestSlowQuery(t *testing.T) {
 	srv := NewTestServer(t)
 	defer srv.Stop()
@@ -183,6 +210,7 @@ func (srv *TestServer) process(frame frame, conn net.Conn) {
 		}
 		switch strings.ToLower(first) {
 		case "kill":
+			atomic.AddUint64(&srv.nKillReq, 1)
 			select {}
 		case "slow":
 			go func() {

+ 10 - 0
policies.go

@@ -0,0 +1,10 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//This file will be the future home for more policies
+package gocql
+
+// RetryPolicy represents the retry behavour for a query.
+type RetryPolicy struct {
+	NumRetries int //Number of times to retry a query
+}

+ 70 - 15
session.go

@@ -28,11 +28,12 @@ type Session struct {
 	prefetch float64
 	trace    Tracer
 	mu       sync.RWMutex
+	cfg      ClusterConfig
 }
 
 // NewSession wraps an existing Node.
-func NewSession(node Node) *Session {
-	return &Session{Node: node, cons: Quorum, prefetch: 0.25}
+func NewSession(c *clusterImpl) *Session {
+	return &Session{Node: c, cons: Quorum, prefetch: 0.25, cfg: c.cfg}
 }
 
 // SetConsistency sets the default consistency level for this session. This
@@ -77,7 +78,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	s.mu.RLock()
 	qry := &Query{stmt: stmt, values: values, cons: s.cons,
 		session: s, pageSize: s.pageSize, trace: s.trace,
-		prefetch: s.prefetch}
+		prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
 	s.mu.RUnlock()
 	return qry
 }
@@ -89,19 +90,51 @@ func (s *Session) Close() {
 }
 
 func (s *Session) executeQuery(qry *Query) *Iter {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return &Iter{err: ErrUnavailable}
+	var itr *Iter
+	count := 0
+	for count <= qry.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable to the iterator
+		if conn == nil {
+			itr = &Iter{err: ErrUnavailable}
+			break
+		}
+		itr = conn.executeQuery(qry)
+		//Exit for loop if the query was successful
+		if itr.err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeQuery(qry)
+	return itr
 }
 
+// ExecuteBatch executes a batch operation and returns nil if successful
+// otherwise an error is returned describing the failure.
 func (s *Session) ExecuteBatch(batch *Batch) error {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return ErrUnavailable
+	// Prevent the execution of the batch if greater than the limit
+	// Currently batches have a limit of 65536 queries.
+	// https://datastax-oss.atlassian.net/browse/JAVA-229
+	if len(batch.Entries) > 65536 {
+		return ErrTooManyStmts
+	}
+	var err error
+	count := 0
+	for count <= batch.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable and break loop
+		if conn == nil {
+			err = ErrUnavailable
+			break
+		}
+		err = conn.executeBatch(batch)
+		//Exit loop if operation executed correctly
+		if err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeBatch(batch)
+	return err
 }
 
 // Query represents a CQL statement that can be executed.
@@ -114,6 +147,7 @@ type Query struct {
 	prefetch  float64
 	trace     Tracer
 	session   *Session
+	rt        RetryPolicy
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -148,6 +182,12 @@ func (q *Query) Prefetch(p float64) *Query {
 	return q
 }
 
+// RetryPolicy sets the policy to use when retrying the query.
+func (q *Query) RetryPolicy(r RetryPolicy) *Query {
+	q.rt = r
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.session.executeQuery(q)
@@ -273,16 +313,30 @@ type Batch struct {
 	Type    BatchType
 	Entries []BatchEntry
 	Cons    Consistency
+	rt      RetryPolicy
 }
 
+// NewBatch creates a new batch operation without defaults from the cluster
 func NewBatch(typ BatchType) *Batch {
 	return &Batch{Type: typ}
 }
 
+// NewBatch creates a new batch operation using defaults defined in the cluster
+func (s *Session) NewBatch(typ BatchType) *Batch {
+	return &Batch{Type: typ, rt: s.cfg.RetryPolicy}
+}
+
+// Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
 }
 
+// RetryPolicy sets the retry policy to use when executing the batch operation
+func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
+	b.rt = r
+	return b
+}
+
 type BatchType int
 
 const (
@@ -400,8 +454,9 @@ func (e Error) Error() string {
 }
 
 var (
-	ErrNotFound    = errors.New("not found")
-	ErrUnavailable = errors.New("unavailable")
-	ErrProtocol    = errors.New("protocol error")
-	ErrUnsupported = errors.New("feature not supported")
+	ErrNotFound     = errors.New("not found")
+	ErrUnavailable  = errors.New("unavailable")
+	ErrProtocol     = errors.New("protocol error")
+	ErrUnsupported  = errors.New("feature not supported")
+	ErrTooManyStmts = errors.New("too many statements")
 )