Forráskód Böngészése

Merge pull request #454 from abustany/routing-key-improvements

Routing key improvements
Chris Bannister 10 éve
szülő
commit
7e37aa0ace
2 módosított fájl, 50 hozzáadás és 9 törlés
  1. 17 9
      session.go
  2. 33 0
      stress_test.go

+ 17 - 9
session.go

@@ -260,9 +260,8 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
 // returns routing key indexes and type info
 func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	s.routingKeyInfoCache.mu.Lock()
-	cacheKey := s.cfg.Keyspace + stmt
 
-	entry, cached := s.routingKeyInfoCache.lru.Get(cacheKey)
+	entry, cached := s.routingKeyInfoCache.lru.Get(stmt)
 	if cached {
 		// done accessing the cache
 		s.routingKeyInfoCache.mu.Unlock()
@@ -286,7 +285,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	inflight := new(inflightCachedEntry)
 	inflight.wg.Add(1)
 	defer inflight.wg.Done()
-	s.routingKeyInfoCache.lru.Add(cacheKey, inflight)
+	s.routingKeyInfoCache.lru.Add(stmt, inflight)
 	s.routingKeyInfoCache.mu.Unlock()
 
 	var (
@@ -300,14 +299,14 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		// no connections
 		inflight.err = ErrNoConnections
 		// don't cache this error
-		s.routingKeyInfoCache.Remove(cacheKey)
+		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 	}
 
 	prepared, inflight.err = conn.prepareStatement(stmt, nil)
 	if inflight.err != nil {
 		// don't cache this error
-		s.routingKeyInfoCache.Remove(cacheKey)
+		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 	}
 
@@ -323,7 +322,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
 	if inflight.err != nil {
 		// don't cache this error
-		s.routingKeyInfoCache.Remove(cacheKey)
+		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 	}
 
@@ -334,7 +333,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		// in the metadata code, or that the table was just dropped.
 		inflight.err = ErrNoMetadata
 		// don't cache this error
-		s.routingKeyInfoCache.Remove(cacheKey)
+		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 	}
 
@@ -422,6 +421,7 @@ type Query struct {
 	cons             Consistency
 	pageSize         int
 	routingKey       []byte
+	routingKeyBuffer []byte
 	pageState        []byte
 	prefetch         float64
 	trace            Tracer
@@ -532,8 +532,14 @@ func (q *Query) GetRoutingKey() ([]byte, error) {
 		return routingKey, nil
 	}
 
+	// We allocate that buffer only once, so that further re-bind/exec of the
+	// same query don't allocate more memory.
+	if q.routingKeyBuffer == nil {
+		q.routingKeyBuffer = make([]byte, 0, 256)
+	}
+
 	// composite routing key
-	buf := &bytes.Buffer{}
+	buf := bytes.NewBuffer(q.routingKeyBuffer)
 	for i := range routingKeyInfo.indexes {
 		encoded, err := Marshal(
 			routingKeyInfo.types[i],
@@ -542,7 +548,9 @@ func (q *Query) GetRoutingKey() ([]byte, error) {
 		if err != nil {
 			return nil, err
 		}
-		binary.Write(buf, binary.BigEndian, int16(len(encoded)))
+		lenBuf := []byte{0x00, 0x00}
+		binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
+		buf.Write(lenBuf)
 		buf.Write(encoded)
 		buf.WriteByte(0x00)
 	}

+ 33 - 0
stress_test.go

@@ -38,3 +38,36 @@ func BenchmarkConnStress(b *testing.B) {
 	b.RunParallel(writer)
 
 }
+
+func BenchmarkConnRoutingKey(b *testing.B) {
+	const workers = 16
+
+	cluster := createCluster()
+	cluster.NumConns = 1
+	cluster.NumStreams = workers
+	cluster.ConnPoolType = NewTokenAwareConnPool
+	session := createSessionFromCluster(cluster, b)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE IF NOT EXISTS routing_key_stress (id int primary key)"); err != nil {
+		b.Fatal(err)
+	}
+
+	var seed uint64
+	writer := func(pb *testing.PB) {
+		seed := atomic.AddUint64(&seed, 1)
+		var i uint64 = 0
+		query := session.Query("insert into routing_key_stress (id) values (?)")
+
+		for pb.Next() {
+			if _, err := query.Bind(i * seed).GetRoutingKey(); err != nil {
+				b.Error(err)
+				return
+			}
+			i++
+		}
+	}
+
+	b.SetParallelism(workers)
+	b.RunParallel(writer)
+}