浏览代码

Added retry policies for Query and Batch Statements. Policies can be defined at the cluster or per query. Added test cases for retries and added test case for batch statement limits.

Phillip Couto 11 年之前
父节点
当前提交
e2264bb7f8
共有 5 个文件被更改,包括 136 次插入19 次删除
  1. 23 0
      cassandra_test.go
  2. 1 0
      cluster.go
  3. 32 4
      conn_test.go
  4. 10 0
      policies.go
  5. 70 15
      session.go

+ 23 - 0
cassandra_test.go

@@ -249,6 +249,29 @@ func TestBatch(t *testing.T) {
 	}
 }
 
+// TestBatchLimit tests gocql to make sure batch operations larger than the maximum
+// statement limit are not submitted to a cassandra node.
+func TestBatchLimit(t *testing.T) {
+	if *flagProto == 1 {
+		t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
+	}
+	session := createSession(t)
+	defer session.Close()
+
+	if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
+		t.Fatal("create table:", err)
+	}
+
+	batch := NewBatch(LoggedBatch)
+	for i := 0; i < 65537; i++ {
+		batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
+	}
+	if err := session.ExecuteBatch(batch); err == nil {
+		t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
+	}
+
+}
+
 type Page struct {
 	Title       string
 	RevId       UUID

+ 1 - 0
cluster.go

@@ -32,6 +32,7 @@ type ClusterConfig struct {
 	Consistency   Consistency   // default consistency level (default: Quorum)
 	Compressor    Compressor    // compression algorithm (default: nil)
 	Authenticator Authenticator // authenticator (default: nil)
+	RetryPolicy   RetryPolicy   // Default retry policy to use for queries(default:0)
 }
 
 // NewCluster generates a new config for the default cluster implementation.

+ 32 - 4
conn_test.go

@@ -15,10 +15,11 @@ import (
 )
 
 type TestServer struct {
-	Address string
-	t       *testing.T
-	nreq    uint64
-	listen  net.Listener
+	Address  string
+	t        *testing.T
+	nreq     uint64
+	listen   net.Listener
+	nKillReq uint64
 }
 
 func TestSimple(t *testing.T) {
@@ -69,6 +70,32 @@ func TestTimeout(t *testing.T) {
 	}
 }
 
+// TestQueryRetry will test to make sure that gocql will execute
+// the exact amount of retry queries designated by the user.
+func TestQueryRetry(t *testing.T) {
+	srv := NewTestServer(t)
+	defer srv.Stop()
+
+	db, err := NewCluster(srv.Address).CreateSession()
+	if err != nil {
+		t.Errorf("NewCluster: %v", err)
+	}
+
+	go func() {
+		<-time.After(5 * time.Second)
+		t.Fatal("no timeout")
+	}()
+	rt := RetryPolicy{NumRetries: 1}
+
+	if err := db.Query("kill").RetryPolicy(rt).Exec(); err == nil {
+		t.Fatal("expected error")
+	}
+	//Minus 1 from the nKillReq variable since there is the initial query attempt
+	if srv.nKillReq-1 != uint64(rt.NumRetries) {
+		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, srv.nKillReq-1)
+	}
+}
+
 func TestSlowQuery(t *testing.T) {
 	srv := NewTestServer(t)
 	defer srv.Stop()
@@ -183,6 +210,7 @@ func (srv *TestServer) process(frame frame, conn net.Conn) {
 		}
 		switch strings.ToLower(first) {
 		case "kill":
+			atomic.AddUint64(&srv.nKillReq, 1)
 			select {}
 		case "slow":
 			go func() {

+ 10 - 0
policies.go

@@ -0,0 +1,10 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//This file will be the future home for more policies
+package gocql
+
+// RetryPolicy represents the retry behavour for a query.
+type RetryPolicy struct {
+	NumRetries int //Number of times to retry a query
+}

+ 70 - 15
session.go

@@ -28,11 +28,12 @@ type Session struct {
 	prefetch float64
 	trace    Tracer
 	mu       sync.RWMutex
+	cfg      ClusterConfig
 }
 
 // NewSession wraps an existing Node.
-func NewSession(node Node) *Session {
-	return &Session{Node: node, cons: Quorum, prefetch: 0.25}
+func NewSession(c *clusterImpl) *Session {
+	return &Session{Node: c, cons: Quorum, prefetch: 0.25, cfg: c.cfg}
 }
 
 // SetConsistency sets the default consistency level for this session. This
@@ -77,7 +78,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	s.mu.RLock()
 	qry := &Query{stmt: stmt, values: values, cons: s.cons,
 		session: s, pageSize: s.pageSize, trace: s.trace,
-		prefetch: s.prefetch}
+		prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
 	s.mu.RUnlock()
 	return qry
 }
@@ -89,19 +90,51 @@ func (s *Session) Close() {
 }
 
 func (s *Session) executeQuery(qry *Query) *Iter {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return &Iter{err: ErrUnavailable}
+	itr := &Iter{}
+	count := 0
+	for count <= qry.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable to the iterator
+		if conn == nil {
+			itr.err = ErrUnavailable
+			break
+		}
+		itr = conn.executeQuery(qry)
+		//Exit for loop if the query was successful
+		if itr.err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeQuery(qry)
+	return itr
 }
 
+// ExecuteBatch executes a batch operation and returns nil if successful
+// otherwise an error is returned describing the failure.
 func (s *Session) ExecuteBatch(batch *Batch) error {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return ErrUnavailable
+	// Prevent the execution of the batch if greater than the limit
+	// Currently batches have a limit of 65536 queries.
+	// https://datastax-oss.atlassian.net/browse/JAVA-229
+	if len(batch.Entries) > 65536 {
+		return ErrTooManyStmts
+	}
+	var err error
+	count := 0
+	for count <= batch.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable and break loop
+		if conn == nil {
+			err = ErrUnavailable
+			break
+		}
+		err = conn.executeBatch(batch)
+		//Exit loop if operation executed correctly
+		if err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeBatch(batch)
+	return err
 }
 
 // Query represents a CQL statement that can be executed.
@@ -114,6 +147,7 @@ type Query struct {
 	prefetch  float64
 	trace     Tracer
 	session   *Session
+	rt        RetryPolicy
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -148,6 +182,12 @@ func (q *Query) Prefetch(p float64) *Query {
 	return q
 }
 
+// RetryPolicy sets the policy to use when retrying the query.
+func (q *Query) RetryPolicy(r RetryPolicy) *Query {
+	q.rt = r
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.session.executeQuery(q)
@@ -273,16 +313,30 @@ type Batch struct {
 	Type    BatchType
 	Entries []BatchEntry
 	Cons    Consistency
+	rt      RetryPolicy
 }
 
+// NewBatch creates a new batch operation without defaults from the cluster
 func NewBatch(typ BatchType) *Batch {
 	return &Batch{Type: typ}
 }
 
+// NewBatch creates a new batch operation using defaults defined in the cluster
+func (s *Session) NewBatch(typ BatchType) *Batch {
+	return &Batch{Type: typ, rt: s.cfg.RetryPolicy}
+}
+
+// Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
 }
 
+// RetryPolicy sets the retry policy to use when executing the batch operation
+func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
+	b.rt = r
+	return b
+}
+
 type BatchType int
 
 const (
@@ -400,8 +454,9 @@ func (e Error) Error() string {
 }
 
 var (
-	ErrNotFound    = errors.New("not found")
-	ErrUnavailable = errors.New("unavailable")
-	ErrProtocol    = errors.New("protocol error")
-	ErrUnsupported = errors.New("feature not supported")
+	ErrNotFound     = errors.New("not found")
+	ErrUnavailable  = errors.New("unavailable")
+	ErrProtocol     = errors.New("protocol error")
+	ErrUnsupported  = errors.New("feature not supported")
+	ErrTooManyStmts = errors.New("too many statements")
 )