浏览代码

Moved preparedLRU definition to cluster.go as it seems more fitting. Reduced scope of locking for the prepared statement cache in the error handling code when executing batch statement. Improved the test cases.

Phillip Couto 11 年之前
父节点
当前提交
567a7eaaba
共有 3 个文件被更改,包括 51 次插入38 次删除
  1. 25 19
      cassandra_test.go
  2. 24 3
      cluster.go
  3. 2 16
      conn.go

+ 25 - 19
cassandra_test.go

@@ -636,10 +636,7 @@ func TestPreparedCacheEviction(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 	stmtsLRU.mu.Lock()
-	for stmtsLRU.lru.Len() > 10 {
-		stmtsLRU.lru.RemoveOldest()
-	}
-	stmtsLRU.lru.MaxEntries = 10
+	stmtsLRU.Max(10)
 	stmtsLRU.mu.Unlock()
 
 	if err := session.Query(`CREATE TABLE prepCacheEvict (
@@ -650,7 +647,7 @@ PRIMARY KEY (id)
 		t.Fatal("create table:", err)
 	}
 
-	for i := 0; i < 100; i++ {
+	for i := 0; i < 15; i++ {
 		if err := session.Query(`INSERT INTO prepCacheEvict (id,mod) VALUES (?, ?)`,
 			i, 10000%(i+1)).Exec(); err != nil {
 			t.Fatal("insert:", err)
@@ -658,7 +655,7 @@ PRIMARY KEY (id)
 	}
 
 	var id, mod int
-	for i := 0; i < 100; i++ {
+	for i := 0; i < 15; i++ {
 		err := session.Query("SELECT id,mod FROM prepcacheevict WHERE id = "+strconv.FormatInt(int64(i), 10)).Scan(&id, &mod)
 		if err != nil {
 			t.Error("select prepcacheevit:", err)
@@ -670,27 +667,20 @@ PRIMARY KEY (id)
 	}
 }
 
-//TestPreparedCacheAccuracy will test to make sure cached queries are moving to expected positions within the cache
-func TestPreparedCacheAccuracy(t *testing.T) {
+//TestPreparedCacheAccuracy will test to make sure cached queries are being retained properly
+func TestPreparedCaching(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 	//purge cache
 	stmtsLRU.mu.Lock()
-	for stmtsLRU.lru.Len() > 0 {
-		stmtsLRU.lru.RemoveOldest()
-	}
-	stmtsLRU.lru.MaxEntries = 10
+	stmtsLRU.Max(2)
 	stmtsLRU.mu.Unlock()
 
-	if err := session.Query(`CREATE TABLE prepCacheacc (
-id int,
-mod int,
-PRIMARY KEY (id)
-)`).Exec(); err != nil {
+	if err := session.Query(`CREATE TABLE prepCacheacc (id int,mod int,PRIMARY KEY (id))`).Exec(); err != nil {
 		t.Fatal("create table:", err)
 	}
 
-	for i := 0; i < 100; i++ {
+	for i := 0; i < 4; i++ {
 		if err := session.Query(`INSERT INTO prepCacheacc (id,mod) VALUES (?, ?)`,
 			i, 10000%(i+1)).Exec(); err != nil {
 			t.Fatal("insert:", err)
@@ -698,7 +688,7 @@ PRIMARY KEY (id)
 	}
 
 	var id, mod int
-	for i := 0; i < 100; i++ {
+	for i := 0; i < 4; i++ {
 		err := session.Query("SELECT id,mod FROM prepCacheacc WHERE id = ?", i).Scan(&id, &mod)
 		if err != nil {
 			t.Error("select prepCacheacc:", err)
@@ -708,4 +698,20 @@ PRIMARY KEY (id)
 	if stmtsLRU.lru.Len() != 2 {
 		t.Errorf("expected cache size of %v, got %v", 2, stmtsLRU.lru.Len())
 	}
+
+	//Walk through all the configured hosts and see if the queries were cached.
+	var insFound, selFound bool
+	for i := range session.cfg.Hosts {
+		_, ok := stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042INSERT INTO prepCacheacc (id,mod) VALUES (?, ?)")
+		insFound = insFound || ok
+
+		_, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042SELECT id,mod FROM prepCacheacc WHERE id = ?")
+		selFound = selFound || ok
+	}
+	if !insFound {
+		t.Error("expected cache to retain insert statement, but statement was purged.")
+	}
+	if !selFound {
+		t.Error("expected cache to retain select statement, but statement was purged.")
+	}
 }

+ 24 - 3
cluster.go

@@ -6,9 +6,29 @@ package gocql
 
 import (
 	"errors"
+	"github.com/golang/groupcache/lru"
+	"sync"
 	"time"
 )
 
+//Package global reference to Prepared Statements LRU
+var stmtsLRU preparedLRU
+
+//preparedLRU is the prepared statement cache
+type preparedLRU struct {
+	lru *lru.Cache
+	mu  sync.Mutex
+}
+
+//Max adjusts the maximum size of the cache and cleans up the oldest records if
+//the new max is lower than the previous value. Not concurrency safe.
+func (p *preparedLRU) Max(max int) {
+	for p.lru.Len() > max {
+		p.lru.RemoveOldest()
+	}
+	p.lru.MaxEntries = max
+}
+
 // ClusterConfig is a struct to configure the default cluster implementation
 // of gocoql. It has a varity of attributes that can be used to modify the
 // behavior to fit the most common use cases. Applications that requre a
@@ -62,10 +82,11 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 
 	//Adjust the size of the prepared statements cache to match the latest configuration
 	stmtsLRU.mu.Lock()
-	for stmtsLRU.lru.Len() > cfg.MaxPreparedStmts {
-		stmtsLRU.lru.RemoveOldest()
+	if stmtsLRU.lru != nil {
+		stmtsLRU.Max(cfg.MaxPreparedStmts)
+	} else {
+		stmtsLRU.lru = lru.New(cfg.MaxPreparedStmts)
 	}
-	stmtsLRU.lru.MaxEntries = cfg.MaxPreparedStmts
 	stmtsLRU.mu.Unlock()
 
 	//See if there are any connections in the pool

+ 2 - 16
conn.go

@@ -7,7 +7,6 @@ package gocql
 import (
 	"bufio"
 	"fmt"
-	"github.com/golang/groupcache/lru"
 	"net"
 	"sync"
 	"sync/atomic"
@@ -18,14 +17,6 @@ const defaultFrameSize = 4096
 const flagResponse = 0x80
 const maskVersion = 0x7F
 
-//Package global reference to Prepared Statements LRU
-var stmtsLRU *preparedLRU
-
-//init houses could to initialize components related to connections like LRU for prepared statements
-func init() {
-	stmtsLRU = &preparedLRU{lru: lru.New(10)}
-}
-
 type Authenticator interface {
 	Challenge(req []byte) (resp []byte, auth Authenticator, err error)
 	Success(data []byte) error
@@ -510,12 +501,12 @@ func (c *Conn) executeBatch(batch *Batch) error {
 	case resultVoidFrame:
 		return nil
 	case RequestErrUnprepared:
-		stmtsLRU.mu.Lock()
 		stmt, found := stmts[string(x.StatementId)]
 		if found {
+			stmtsLRU.mu.Lock()
 			stmtsLRU.lru.Remove(c.addr + stmt)
+			stmtsLRU.mu.Unlock()
 		}
-		stmtsLRU.mu.Unlock()
 		if found {
 			return c.executeBatch(batch)
 		} else {
@@ -627,8 +618,3 @@ type inflightPrepare struct {
 	err  error
 	wg   sync.WaitGroup
 }
-
-type preparedLRU struct {
-	lru *lru.Cache
-	mu  sync.Mutex
-}