Преглед изворни кода

Merge pull request #353 from Zariel/serial-consistency

Add with serial consistency support
Ben Hood пре 10 година
родитељ
комит
61d083e439
6 измењених фајлова са 90 додато и 45 уклоњено
  1. 1 0
      cassandra_test.go
  2. 19 18
      cluster.go
  3. 8 4
      conn.go
  4. 34 20
      frame.go
  5. 26 2
      session.go
  6. 2 1
      session_test.go

+ 1 - 0
cassandra_test.go

@@ -279,6 +279,7 @@ func TestCAS(t *testing.T) {
 
 	session := createSession(t)
 	defer session.Close()
+	session.cfg.SerialConsistency = LocalSerial
 
 	if err := createTable(session, `CREATE TABLE cas_table (
 			title         varchar,

+ 19 - 18
cluster.go

@@ -55,24 +55,25 @@ type DiscoveryConfig struct {
 // behavior to fit the most common use cases. Applications that requre a
 // different setup must implement their own cluster.
 type ClusterConfig struct {
-	Hosts             []string      // addresses for the initial connections
-	CQLVersion        string        // CQL version (default: 3.0.0)
-	ProtoVersion      int           // version of the native protocol (default: 2)
-	Timeout           time.Duration // connection timeout (default: 600ms)
-	Port              int           // port (default: 9042)
-	Keyspace          string        // initial keyspace (optional)
-	NumConns          int           // number of connections per host (default: 2)
-	NumStreams        int           // number of streams per connection (default: max per protocol, either 128 or 32768)
-	Consistency       Consistency   // default consistency level (default: Quorum)
-	Compressor        Compressor    // compression algorithm (default: nil)
-	Authenticator     Authenticator // authenticator (default: nil)
-	RetryPolicy       RetryPolicy   // Default retry policy to use for queries (default: 0)
-	SocketKeepalive   time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
-	ConnPoolType      NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
-	DiscoverHosts     bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
-	MaxPreparedStmts  int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
-	MaxRoutingKeyInfo int           // Sets the maximum cache size for query info about statements for each session (default: 1000)
-	PageSize          int           // Default page size to use for created sessions (default: 0)
+	Hosts             []string          // addresses for the initial connections
+	CQLVersion        string            // CQL version (default: 3.0.0)
+	ProtoVersion      int               // version of the native protocol (default: 2)
+	Timeout           time.Duration     // connection timeout (default: 600ms)
+	Port              int               // port (default: 9042)
+	Keyspace          string            // initial keyspace (optional)
+	NumConns          int               // number of connections per host (default: 2)
+	NumStreams        int               // number of streams per connection (default: max per protocol, either 128 or 32768)
+	Consistency       Consistency       // default consistency level (default: Quorum)
+	Compressor        Compressor        // compression algorithm (default: nil)
+	Authenticator     Authenticator     // authenticator (default: nil)
+	RetryPolicy       RetryPolicy       // Default retry policy to use for queries (default: 0)
+	SocketKeepalive   time.Duration     // The keepalive period to use, enabled if > 0 (default: 0)
+	ConnPoolType      NewPoolFunc       // The function used to create the connection pool for the session (default: NewSimplePool)
+	DiscoverHosts     bool              // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
+	MaxPreparedStmts  int               // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	MaxRoutingKeyInfo int               // Sets the maximum cache size for query info about statements for each session (default: 1000)
+	PageSize          int               // Default page size to use for created sessions (default: 0)
+	SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
 	Discovery         DiscoveryConfig
 	SslOpts           *SslOptions
 }

+ 8 - 4
conn.go

@@ -434,7 +434,10 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		consistency: qry.cons,
 	}
 
-	// TODO: Add DefaultTimestamp, SerialConsistency
+	// frame checks that it is not 0
+	params.serialConsistency = qry.serialCons
+
+	// TODO: Add DefaultTimestamp
 	if len(qry.pageState) > 0 {
 		params.pagingState = qry.pageState
 	}
@@ -602,9 +605,10 @@ func (c *Conn) executeBatch(batch *Batch) error {
 
 	n := len(batch.Entries)
 	req := &writeBatchFrame{
-		typ:         batch.Type,
-		statements:  make([]batchStatment, n),
-		consistency: batch.Cons,
+		typ:               batch.Type,
+		statements:        make([]batchStatment, n),
+		consistency:       batch.Cons,
+		serialConsistency: batch.serialCons,
 	}
 
 	stmts := make(map[string]string)

+ 34 - 20
frame.go

@@ -135,16 +135,14 @@ type Consistency uint16
 
 const (
 	Any         Consistency = 0x00
-	One                     = 0x01
-	Two                     = 0x02
-	Three                   = 0x03
-	Quorum                  = 0x04
-	All                     = 0x05
-	LocalQuorum             = 0x06
-	EachQuorum              = 0x07
-	Serial                  = 0x08
-	LocalSerial             = 0x09
-	LocalOne                = 0x0A
+	One         Consistency = 0x01
+	Two         Consistency = 0x02
+	Three       Consistency = 0x03
+	Quorum      Consistency = 0x04
+	All         Consistency = 0x05
+	LocalQuorum Consistency = 0x06
+	EachQuorum  Consistency = 0x07
+	LocalOne    Consistency = 0x0A
 )
 
 func (c Consistency) String() string {
@@ -165,14 +163,28 @@ func (c Consistency) String() string {
 		return "LOCAL_QUORUM"
 	case EachQuorum:
 		return "EACH_QUORUM"
+	case LocalOne:
+		return "LOCAL_ONE"
+	default:
+		return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
+	}
+}
+
+type SerialConsistency uint16
+
+const (
+	Serial      SerialConsistency = 0x08
+	LocalSerial SerialConsistency = 0x09
+)
+
+func (s SerialConsistency) String() string {
+	switch s {
 	case Serial:
 		return "SERIAL"
 	case LocalSerial:
 		return "LOCAL_SERIAL"
-	case LocalOne:
-		return "LOCAL_ONE"
 	default:
-		return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
+		return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
 	}
 }
 
@@ -904,7 +916,7 @@ type queryParams struct {
 	values            []queryValues
 	pageSize          int
 	pagingState       []byte
-	serialConsistency Consistency
+	serialConsistency SerialConsistency
 	// v3+
 	timestamp *time.Time
 }
@@ -972,7 +984,7 @@ func (f *framer) writeQueryParams(opts *queryParams) {
 	}
 
 	if opts.serialConsistency > 0 {
-		f.writeConsistency(opts.serialConsistency)
+		f.writeConsistency(Consistency(opts.serialConsistency))
 	}
 
 	if f.proto > protoVersion2 && opts.timestamp != nil {
@@ -1048,10 +1060,12 @@ type batchStatment struct {
 }
 
 type writeBatchFrame struct {
-	typ               BatchType
-	statements        []batchStatment
-	consistency       Consistency
-	serialConsistency Consistency
+	typ         BatchType
+	statements  []batchStatment
+	consistency Consistency
+
+	// v3+
+	serialConsistency SerialConsistency
 	defaultTimestamp  bool
 }
 
@@ -1104,7 +1118,7 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame) error {
 		f.writeByte(flags)
 
 		if w.serialConsistency > 0 {
-			f.writeConsistency(w.serialConsistency)
+			f.writeConsistency(Consistency(w.serialConsistency))
 		}
 		if w.defaultTimestamp {
 			now := time.Now().UnixNano() / 1000

+ 26 - 2
session.go

@@ -96,7 +96,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	s.mu.RLock()
 	qry := &Query{stmt: stmt, values: values, cons: s.cons,
 		session: s, pageSize: s.pageSize, trace: s.trace,
-		prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
+		prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency}
 	s.mu.RUnlock()
 	return qry
 }
@@ -372,6 +372,7 @@ type Query struct {
 	binding      func(q *QueryInfo) ([]interface{}, error)
 	attempts     int
 	totalLatency int64
+	serialCons   SerialConsistency
 }
 
 //Attempts returns the number of times the query was executed.
@@ -517,6 +518,16 @@ func (q *Query) Bind(v ...interface{}) *Query {
 	return q
 }
 
+// SerialConsistency sets the consistencyc level for the
+// serial phase of conditional updates. That consitency can only be
+// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+// SERIAL. This option will be ignored for anything else that a
+// conditional update/insert.
+func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
+	q.serialCons = cons
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.Iter()
@@ -713,6 +724,7 @@ type Batch struct {
 	rt           RetryPolicy
 	attempts     int
 	totalLatency int64
+	serialCons   SerialConsistency
 }
 
 // NewBatch creates a new batch operation without defaults from the cluster
@@ -722,7 +734,7 @@ func NewBatch(typ BatchType) *Batch {
 
 // NewBatch creates a new batch operation using defaults defined in the cluster
 func (s *Session) NewBatch(typ BatchType) *Batch {
-	return &Batch{Type: typ, rt: s.cfg.RetryPolicy}
+	return &Batch{Type: typ, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency}
 }
 
 // Attempts returns the number of attempts made to execute the batch.
@@ -767,6 +779,18 @@ func (b *Batch) Size() int {
 	return len(b.Entries)
 }
 
+// SerialConsistency sets the consistencyc level for the
+// serial phase of conditional updates. That consitency can only be
+// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+// SERIAL. This option will be ignored for anything else that a
+// conditional update/insert.
+//
+// Only available for protocol 3 and above
+func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
+	b.serialCons = cons
+	return b
+}
+
 type BatchType byte
 
 const (

+ 2 - 1
session_test.go

@@ -3,6 +3,7 @@
 package gocql
 
 import (
+	"fmt"
 	"testing"
 )
 
@@ -221,7 +222,7 @@ func TestBatchBasicAPI(t *testing.T) {
 }
 
 func TestConsistencyNames(t *testing.T) {
-	names := map[Consistency]string{
+	names := map[fmt.Stringer]string{
 		Any:         "ANY",
 		One:         "ONE",
 		Two:         "TWO",