瀏覽代碼

Token aware batches (#1313)

* mod: proper modules files

* batch: add token awareness

The first batch entry is used to calculate the token.
This gives users a possibility to group their batches
and get the benefit of token awareness.
Henrik Johansson 6 年之前
父節點
當前提交
aa8c9ac52f
共有 4 個文件被更改,包括 97 次插入55 次删除
  1. 4 0
      go.mod
  2. 0 3
      go.modverify
  3. 22 0
      go.sum
  4. 71 52
      session.go

+ 4 - 0
go.mod

@@ -1,7 +1,11 @@
 module github.com/gocql/gocql
 
 require (
+	github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
+	github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
 	github.com/golang/snappy v0.0.0-20170215233205-553a64147049
 	github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
+	github.com/kr/pretty v0.1.0 // indirect
+	github.com/stretchr/testify v1.3.0 // indirect
 	gopkg.in/inf.v0 v0.9.1
 )

+ 0 - 3
go.modverify

@@ -1,3 +0,0 @@
-github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk=
-github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
-gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=

+ 22 - 0
go.sum

@@ -0,0 +1,22 @@
+github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
+github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
+github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
+github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk=
+github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
+github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
+gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=

+ 71 - 52
session.go

@@ -673,7 +673,6 @@ type Query struct {
 	cons                  Consistency
 	pageSize              int
 	routingKey            []byte
-	routingKeyBuffer      []byte
 	pageState             []byte
 	prefetch              float64
 	trace                 Tracer
@@ -942,46 +941,7 @@ func (q *Query) GetRoutingKey() ([]byte, error) {
 		return nil, err
 	}
 
-	if routingKeyInfo == nil {
-		return nil, nil
-	}
-
-	if len(routingKeyInfo.indexes) == 1 {
-		// single column routing key
-		routingKey, err := Marshal(
-			routingKeyInfo.types[0],
-			q.values[routingKeyInfo.indexes[0]],
-		)
-		if err != nil {
-			return nil, err
-		}
-		return routingKey, nil
-	}
-
-	// We allocate that buffer only once, so that further re-bind/exec of the
-	// same query don't allocate more memory.
-	if q.routingKeyBuffer == nil {
-		q.routingKeyBuffer = make([]byte, 0, 256)
-	}
-
-	// composite routing key
-	buf := bytes.NewBuffer(q.routingKeyBuffer)
-	for i := range routingKeyInfo.indexes {
-		encoded, err := Marshal(
-			routingKeyInfo.types[i],
-			q.values[routingKeyInfo.indexes[i]],
-		)
-		if err != nil {
-			return nil, err
-		}
-		lenBuf := []byte{0x00, 0x00}
-		binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
-		buf.Write(lenBuf)
-		buf.Write(encoded)
-		buf.WriteByte(0x00)
-	}
-	routingKey := buf.Bytes()
-	return routingKey, nil
+	return createRoutingKey(routingKeyInfo, q.values)
 }
 
 func (q *Query) shouldPrepare() bool {
@@ -1471,10 +1431,13 @@ type Batch struct {
 	Type                  BatchType
 	Entries               []BatchEntry
 	Cons                  Consistency
+	routingKey            []byte
+	routingKeyBuffer      []byte
 	CustomPayload         map[string][]byte
 	rt                    RetryPolicy
 	spec                  SpeculativeExecutionPolicy
 	observer              BatchObserver
+	session               *Session
 	serialCons            SerialConsistency
 	defaultTimestamp      bool
 	defaultTimestampValue int64
@@ -1499,15 +1462,16 @@ func NewBatch(typ BatchType) *Batch {
 func (s *Session) NewBatch(typ BatchType) *Batch {
 	s.mu.RLock()
 	batch := &Batch{
-		Type:             typ,
-		rt:               s.cfg.RetryPolicy,
-		serialCons:       s.cfg.SerialConsistency,
-		observer:         s.batchObserver,
-		Cons:             s.cons,
-		defaultTimestamp: s.cfg.DefaultTimestamp,
-		keyspace:         s.cfg.Keyspace,
-		metrics:          &queryMetrics{m: make(map[string]*hostMetrics)},
-		spec:             &NonSpeculativeExecution{},
+		Type:              typ,
+		rt:                s.cfg.RetryPolicy,
+		serialCons:        s.cfg.SerialConsistency,
+		observer:          s.batchObserver,
+		session:           s,
+		Cons:              s.cons,
+		defaultTimestamp:  s.cfg.DefaultTimestamp,
+		keyspace:          s.cfg.Keyspace,
+		metrics:           &queryMetrics{m: make(map[string]*hostMetrics)},
+		spec:              &NonSpeculativeExecution{},
 	}
 
 	s.mu.RUnlock()
@@ -1729,8 +1693,63 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
 }
 
 func (b *Batch) GetRoutingKey() ([]byte, error) {
-	// TODO: use the first statement in the batch as the routing key?
-	return nil, nil
+	if b.routingKey != nil {
+		return b.routingKey, nil
+	}
+
+	if len(b.Entries) == 0 {
+		return nil, nil
+	}
+
+	entry := b.Entries[0]
+	if entry.binding != nil {
+		// bindings do not have the values let's skip it like Query does.
+		return nil, nil
+	}
+	// try to determine the routing key
+	routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt)
+	if err != nil {
+		return nil, err
+	}
+
+	return createRoutingKey(routingKeyInfo, entry.Args)
+}
+
+func createRoutingKey(routingKeyInfo *routingKeyInfo, values []interface{}) ([]byte, error) {
+	if routingKeyInfo == nil {
+		return nil, nil
+	}
+
+	if len(routingKeyInfo.indexes) == 1 {
+		// single column routing key
+		routingKey, err := Marshal(
+			routingKeyInfo.types[0],
+			values[routingKeyInfo.indexes[0]],
+		)
+		if err != nil {
+			return nil, err
+		}
+		return routingKey, nil
+	}
+
+	// composite routing key
+	buf := bytes.NewBuffer(make([]byte, 0, 256))
+	for i := range routingKeyInfo.indexes {
+		encoded, err := Marshal(
+			routingKeyInfo.types[i],
+			values[routingKeyInfo.indexes[i]],
+		)
+		if err != nil {
+			return nil, err
+		}
+		lenBuf := []byte{0x00, 0x00}
+		binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
+		buf.Write(lenBuf)
+		buf.Write(encoded)
+		buf.WriteByte(0x00)
+	}
+	routingKey := buf.Bytes()
+	return routingKey, nil
 }
 
 type BatchType byte