Forráskód Böngészése

Consolidate CreateSession into NewSession

CreateSession on the cluster config is a helper method to setup
defaults for the session, but it also does a lot of the setup, such
as creating the connection pool and creating the host discovery
goroutine.

Move all this logic into NewSession so that CreateSession now just
returns NewSession.
Chris Bannister 10 éve
szülő
commit
6a5b608c60
3 módosított fájl, 65 hozzáadás és 58 törlés
  1. 1 46
      cluster.go
  2. 53 5
      session.go
  3. 11 7
      session_test.go

+ 1 - 46
cluster.go

@@ -101,52 +101,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
 // CreateSession initializes the cluster based on this config and returns a
 // session object that can be used to interact with the database.
 func (cfg *ClusterConfig) CreateSession() (*Session, error) {
-
-	//Check that hosts in the ClusterConfig is not empty
-	if len(cfg.Hosts) < 1 {
-		return nil, ErrNoHosts
-	}
-
-	maxStreams := 128
-	if cfg.ProtoVersion > protoVersion2 {
-		maxStreams = 32768
-	}
-
-	if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
-		cfg.NumStreams = maxStreams
-	}
-
-	pool, err := cfg.ConnPoolType(cfg)
-	if err != nil {
-		return nil, err
-	}
-
-	//Adjust the size of the prepared statements cache to match the latest configuration
-	stmtsLRU.Lock()
-	initStmtsLRU(cfg.MaxPreparedStmts)
-	stmtsLRU.Unlock()
-
-	//See if there are any connections in the pool
-	if pool.Size() > 0 {
-		s := NewSession(pool, *cfg)
-		s.SetConsistency(cfg.Consistency)
-		s.SetPageSize(cfg.PageSize)
-
-		if cfg.DiscoverHosts {
-			hostSource := &ringDescriber{
-				session:    s,
-				dcFilter:   cfg.Discovery.DcFilter,
-				rackFilter: cfg.Discovery.RackFilter,
-			}
-
-			go hostSource.run(cfg.Discovery.Sleep)
-		}
-
-		return s, nil
-	}
-
-	pool.Close()
-	return nil, ErrNoConnectionsStarted
+	return NewSession(*cfg)
 }
 
 var (

+ 53 - 5
session.go

@@ -44,13 +44,61 @@ type Session struct {
 }
 
 // NewSession wraps an existing Node.
-func NewSession(p ConnectionPool, c ClusterConfig) *Session {
-	session := &Session{Pool: p, cons: c.Consistency, prefetch: 0.25, cfg: c}
+func NewSession(cfg ClusterConfig) (*Session, error) {
+	//Check that hosts in the ClusterConfig is not empty
+	if len(cfg.Hosts) < 1 {
+		return nil, ErrNoHosts
+	}
+
+	maxStreams := 128
+	if cfg.ProtoVersion > protoVersion2 {
+		maxStreams = 32768
+	}
+
+	if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
+		cfg.NumStreams = maxStreams
+	}
+
+	pool, err := cfg.ConnPoolType(&cfg)
+	if err != nil {
+		return nil, err
+	}
+
+	//Adjust the size of the prepared statements cache to match the latest configuration
+	stmtsLRU.Lock()
+	initStmtsLRU(cfg.MaxPreparedStmts)
+	stmtsLRU.Unlock()
+
+	s := &Session{
+		Pool:     pool,
+		cons:     cfg.Consistency,
+		prefetch: 0.25,
+		cfg:      cfg,
+	}
+
+	//See if there are any connections in the pool
+	if pool.Size() > 0 {
+		s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
+
+		s.SetConsistency(cfg.Consistency)
+		s.SetPageSize(cfg.PageSize)
+
+		if cfg.DiscoverHosts {
+			hostSource := &ringDescriber{
+				session:    s,
+				dcFilter:   cfg.Discovery.DcFilter,
+				rackFilter: cfg.Discovery.RackFilter,
+			}
+
+			go hostSource.run(cfg.Discovery.Sleep)
+		}
+
+		return s, nil
+	}
 
-	// create the query info cache
-	session.routingKeyInfoCache.lru = lru.New(c.MaxRoutingKeyInfo)
+	s.Close()
 
-	return session
+	return nil, ErrNoConnectionsStarted
 }
 
 // SetConsistency sets the default consistency level for this session. This

+ 11 - 7
session_test.go

@@ -9,13 +9,18 @@ import (
 
 func TestSessionAPI(t *testing.T) {
 
-	cfg := ClusterConfig{}
-	pool, err := NewSimplePool(&cfg)
+	cfg := &ClusterConfig{}
+	pool, err := NewSimplePool(cfg)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	s := NewSession(pool, cfg)
+	s := &Session{
+		Pool: pool,
+		cfg:  *cfg,
+		cons: Quorum,
+	}
+
 	defer s.Close()
 
 	s.SetConsistency(All)
@@ -154,14 +159,13 @@ func TestQueryShouldPrepare(t *testing.T) {
 
 func TestBatchBasicAPI(t *testing.T) {
 
-	cfg := ClusterConfig{}
+	cfg := NewCluster("127.0.0.1")
 	cfg.RetryPolicy = &SimpleRetryPolicy{NumRetries: 2}
-	pool, err := NewSimplePool(&cfg)
+
+	s, err := cfg.CreateSession()
 	if err != nil {
 		t.Fatal(err)
 	}
-
-	s := NewSession(pool, cfg)
 	defer s.Close()
 
 	b := s.NewBatch(UnloggedBatch)