Browse Source

Merge branch 'master' of github.com:gocql/gocql into 2.1_v2

Ben Hood 11 years ago
parent
commit
e719db9c7a
7 changed files with 112 additions and 29 deletions
  1. 43 12
      cassandra_test.go
  2. 2 1
      conn_test.go
  3. 5 1
      integration.sh
  4. 23 5
      marshal.go
  5. 6 6
      marshal_test.go
  6. 23 2
      policies.go
  7. 10 2
      session.go

+ 43 - 12
cassandra_test.go

@@ -58,7 +58,9 @@ func createCluster() *ClusterConfig {
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = 5 * time.Second
 	cluster.Consistency = Quorum
-	cluster.RetryPolicy.NumRetries = *flagRetry
+	if *flagRetry > 0 {
+		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
+	}
 
 	return cluster
 }
@@ -108,7 +110,9 @@ func TestRingDiscovery(t *testing.T) {
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = 5 * time.Second
 	cluster.Consistency = Quorum
-	cluster.RetryPolicy.NumRetries = *flagRetry
+	if *flagRetry > 0 {
+		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
+	}
 	cluster.DiscoverHosts = true
 
 	session, err := cluster.CreateSession()
@@ -237,33 +241,60 @@ func TestCAS(t *testing.T) {
 	defer session.Close()
 
 	if err := createTable(session, `CREATE TABLE cas_table (
-			title   varchar,
-			revid   timeuuid,
+			title         varchar,
+			revid   	  timeuuid,
+			last_modified timestamp,
 			PRIMARY KEY (title, revid)
 		)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
-	title, revid := "baz", TimeUUID()
+	title, revid, modified := "baz", TimeUUID(), time.Now()
 	var titleCAS string
 	var revidCAS UUID
+	var modifiedCAS time.Time
 
-	if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
-		VALUES (?, ?) IF NOT EXISTS`,
-		title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
+	if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
+		VALUES (?, ?, ?) IF NOT EXISTS`,
+		title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if !applied {
 		t.Fatal("insert should have been applied")
 	}
 
-	if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
-		VALUES (?, ?) IF NOT EXISTS`,
-		title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
+	if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
+		VALUES (?, ?, ?) IF NOT EXISTS`,
+		title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if applied {
 		t.Fatal("insert should not have been applied")
 	} else if title != titleCAS || revid != revidCAS {
-		t.Fatalf("expected %s/%v but got %s/%v", title, revid, titleCAS, revidCAS)
+		t.Fatalf("expected %s/%v/%v but got %s/%v/%v", title, revid, modified, titleCAS, revidCAS, modifiedCAS)
+	}
+
+	tenSecondsLater := modified.Add(10 * time.Second)
+
+	if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
+		title, revid, tenSecondsLater).ScanCAS(&modifiedCAS); err != nil {
+		t.Fatal("delete:", err)
+	} else if applied {
+		t.Fatal("delete should have not been applied")
+	}
+
+	if modifiedCAS.Unix() != tenSecondsLater.Add(-10*time.Second).Unix() {
+		t.Fatalf("Was expecting modified CAS to be %v; but was one second later", modifiedCAS.UTC())
+	}
+
+	if _, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
+		title, revid, tenSecondsLater).ScanCAS(); err.Error() != "count mismatch" {
+		t.Fatalf("delete: was expecting count mismatch error but got %s", err)
+	}
+
+	if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
+		title, revid, modified).ScanCAS(&modifiedCAS); err != nil {
+		t.Fatal("delete:", err)
+	} else if !applied {
+		t.Fatal("delete should have been applied")
 	}
 }
 

+ 2 - 1
conn_test.go

@@ -85,7 +85,8 @@ func TestQueryRetry(t *testing.T) {
 		<-time.After(5 * time.Second)
 		t.Fatal("no timeout")
 	}()
-	rt := RetryPolicy{NumRetries: 1}
+	rt := &SimpleRetryPolicy{NumRetries: 1}
+
 	qry := db.Query("kill").RetryPolicy(rt)
 	if err := qry.Exec(); err == nil {
 		t.Fatal("expected error")

+ 5 - 1
integration.sh

@@ -7,7 +7,11 @@ function run_tests() {
 	local version=$1
 
 	ccm create test -v binary:$version -n $clusterSize -d --vnodes
-	ccm updateconf 'concurrent_reads: 8' 'concurrent_writes: 32' 'rpc_server_type: sync' 'rpc_min_threads: 2' 'rpc_max_threads: 8' 'write_request_timeout_in_ms: 5000' 'read_request_timeout_in_ms: 5000'
+	
+	sed -i '/#MAX_HEAP_SIZE/c\MAX_HEAP_SIZE="256M"' ~/.ccm/repository/$version/conf/cassandra-env.sh
+	sed -i '/#HEAP_NEWSIZE/c\HEAP_NEWSIZE="100M"' ~/.ccm/repository/$version/conf/cassandra-env.sh
+
+	ccm updateconf 'concurrent_reads: 2' 'concurrent_writes: 2' 'rpc_server_type: sync' 'rpc_min_threads: 2' 'rpc_max_threads: 2' 'write_request_timeout_in_ms: 5000' 'read_request_timeout_in_ms: 5000'
 	ccm start
 	ccm status
 

+ 23 - 5
marshal.go

@@ -1044,15 +1044,24 @@ func unmarshalTimeUUID(info *TypeInfo, data []byte, value interface{}) error {
 }
 
 func marshalInet(info *TypeInfo, value interface{}) ([]byte, error) {
+	// we return either the 4 or 16 byte representation of an
+	// ip address here otherwise the db value will be prefixed
+	// with the remaining byte values e.g. ::ffff:127.0.0.1 and not 127.0.0.1
 	switch val := value.(type) {
 	case net.IP:
-		return val, nil
-	case []byte:
-		return val, nil
+		t := val.To4()
+		if t == nil {
+			return val.To16(), nil
+		}
+		return t, nil
 	case string:
 		b := net.ParseIP(val)
 		if b != nil {
-			return b[:], nil
+			t := b.To4()
+			if t == nil {
+				return b.To16(), nil
+			}
+			return t, nil
 		}
 		return nil, marshalErrorf("cannot marshal. invalid ip string %s", val)
 	}
@@ -1064,7 +1073,12 @@ func unmarshalInet(info *TypeInfo, data []byte, value interface{}) error {
 	case Unmarshaler:
 		return v.UnmarshalCQL(info, data)
 	case *net.IP:
-		*v = net.IP(data)
+		ip := net.IP(data)
+		if v4 := ip.To4(); v4 != nil {
+			*v = v4
+			return nil
+		}
+		*v = ip
 		return nil
 	case *string:
 		if len(data) == 0 {
@@ -1072,6 +1086,10 @@ func unmarshalInet(info *TypeInfo, data []byte, value interface{}) error {
 			return nil
 		}
 		ip := net.IP(data)
+		if v4 := ip.To4(); v4 != nil {
+			*v = v4.String()
+			return nil
+		}
 		*v = ip.String()
 		return nil
 	}

+ 6 - 6
marshal_test.go

@@ -275,22 +275,22 @@ var marshalTests = []struct {
 	},
 	{
 		&TypeInfo{Type: TypeInet},
-		[]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF\x7F\x00\x00\x01"),
-		net.ParseIP("127.0.0.1"),
+		[]byte("\x7F\x00\x00\x01"),
+		net.ParseIP("127.0.0.1").To4(),
 	},
 	{
 		&TypeInfo{Type: TypeInet},
-		[]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF\xFF\xFF\xFF\xFF"),
-		net.ParseIP("255.255.255.255"),
+		[]byte("\xFF\xFF\xFF\xFF"),
+		net.ParseIP("255.255.255.255").To4(),
 	},
 	{
 		&TypeInfo{Type: TypeInet},
-		[]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF\x7F\x00\x00\x01"),
+		[]byte("\x7F\x00\x00\x01"),
 		"127.0.0.1",
 	},
 	{
 		&TypeInfo{Type: TypeInet},
-		[]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF\xFF\xFF\xFF\xFF"),
+		[]byte("\xFF\xFF\xFF\xFF"),
 		"255.255.255.255",
 	},
 	{

+ 23 - 2
policies.go

@@ -4,7 +4,28 @@
 //This file will be the future home for more policies
 package gocql
 
-// RetryPolicy represents the retry behavour for a query.
-type RetryPolicy struct {
+//RetryableQuery is an interface that represents a query or batch statement that
+//exposes the correct functions for the retry policy logic to evaluate correctly.
+type RetryableQuery interface {
+	Attempts() int
+}
+
+// RetryPolicy interace is used by gocql to determine if a query can be attempted
+// again after a retryable error has been received. The interface allows gocql
+// users to implement their own logic to determine if a query can be attempted
+// again.
+// See SimpleRetryPolicy as an example of implementing the RetryPolicy interface.
+type RetryPolicy interface {
+	Attempt(RetryableQuery) bool
+}
+
+// SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
+type SimpleRetryPolicy struct {
 	NumRetries int //Number of times to retry a query
 }
+
+// Attempt tells gocql to attempt the query again based on query.Attempts being less
+// than the NumRetries defined in the policy.
+func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
+	return q.Attempts() <= s.NumRetries
+}

+ 10 - 2
session.go

@@ -136,7 +136,7 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	var iter *Iter
 	qry.attempts = 0
 	qry.totalLatency = 0
-	for qry.attempts <= qry.rt.NumRetries {
+	for {
 		conn := s.Pool.Pick(qry)
 
 		//Assign the error unavailable to the iterator
@@ -154,6 +154,10 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 		if iter.err == nil {
 			break
 		}
+
+		if qry.rt == nil || !qry.rt.Attempt(qry) {
+			break
+		}
 	}
 
 	return iter
@@ -177,7 +181,7 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 	var err error
 	batch.attempts = 0
 	batch.totalLatency = 0
-	for batch.attempts <= batch.rt.NumRetries {
+	for {
 		conn := s.Pool.Pick(nil)
 
 		//Assign the error unavailable and break loop
@@ -193,6 +197,10 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 		if err == nil {
 			return nil
 		}
+
+		if batch.rt == nil || !batch.rt.Attempt(batch) {
+			break
+		}
 	}
 
 	return err