Ver código fonte

Merge pull request #536 from Zariel/remove-num-streams

Remove num streams
Chris Bannister 10 anos atrás
pai
commit
10db79a04c
8 arquivos alterados com 17 adições e 139 exclusões
  1. 6 6
      cassandra_test.go
  2. 0 1
      cluster.go
  3. 4 6
      conn.go
  4. 6 113
      conn_test.go
  5. 0 1
      connectionpool.go
  6. 0 9
      session.go
  7. 0 2
      stress_test.go
  8. 1 1
      uuid_test.go

+ 6 - 6
cassandra_test.go

@@ -356,7 +356,7 @@ func TestCAS(t *testing.T) {
 	if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if !applied {
-		t.Fatal("insert should have been applied")
+		t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
 	}
 
 	successBatch = session.NewBatch(LoggedBatch)
@@ -373,7 +373,7 @@ func TestCAS(t *testing.T) {
 	if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if applied {
-		t.Fatal("insert shouldn't have been applied")
+		t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
 	}
 
 	insertBatch := session.NewBatch(LoggedBatch)
@@ -389,10 +389,10 @@ func TestCAS(t *testing.T) {
 	if applied, iter, err := session.ExecuteBatchCAS(failBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if applied {
-		t.Fatal("insert shouldn't have been applied")
+		t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
 	} else {
 		if scan := iter.Scan(&applied, &titleCAS, &revidCAS, &modifiedCAS); scan && applied {
-			t.Fatal("insert shouldn't have been applied")
+			t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
 		} else if !scan {
 			t.Fatal("should have scanned another row")
 		}
@@ -428,7 +428,7 @@ func TestMapScanCAS(t *testing.T) {
 		title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if !applied {
-		t.Fatal("insert should have been applied")
+		t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", title, revid, modified)
 	}
 
 	mapCAS = map[string]interface{}{}
@@ -437,7 +437,7 @@ func TestMapScanCAS(t *testing.T) {
 		title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
 		t.Fatal("insert:", err)
 	} else if applied {
-		t.Fatal("insert should not have been applied")
+		t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", title, revid, modified)
 	} else if title != mapCAS["title"] || revid != mapCAS["revid"] || deleted != mapCAS["deleted"] {
 		t.Fatalf("expected %s/%v/%v/%v but got %s/%v/%v%v", title, revid, modified, false, mapCAS["title"], mapCAS["revid"], mapCAS["last_modified"], mapCAS["deleted"])
 	}

+ 0 - 1
cluster.go

@@ -89,7 +89,6 @@ type ClusterConfig struct {
 	Port              int               // port (default: 9042)
 	Keyspace          string            // initial keyspace (optional)
 	NumConns          int               // number of connections per host (default: 2)
-	NumStreams        int               // number of streams per connection (default: max per protocol, either 128 or 32768)
 	Consistency       Consistency       // default consistency level (default: Quorum)
 	Compressor        Compressor        // compression algorithm (default: nil)
 	Authenticator     Authenticator     // authenticator (default: nil)

+ 4 - 6
conn.go

@@ -93,7 +93,6 @@ type ConnConfig struct {
 	ProtoVersion  int
 	CQLVersion    string
 	Timeout       time.Duration
-	NumStreams    int
 	Compressor    Compressor
 	Authenticator Authenticator
 	Keepalive     time.Duration
@@ -114,11 +113,10 @@ var TimeoutLimit int64 = 10
 // queries, but users are usually advised to use a more reliable, higher
 // level API.
 type Conn struct {
-	conn       net.Conn
-	r          *bufio.Reader
-	timeout    time.Duration
-	cfg        *ConnConfig
-	numStreams int
+	conn    net.Conn
+	r       *bufio.Reader
+	timeout time.Duration
+	cfg     *ConnConfig
 
 	headerBuf []byte
 

+ 6 - 113
conn_test.go

@@ -191,40 +191,6 @@ func TestQueryRetry(t *testing.T) {
 	}
 }
 
-func TestConnClosing(t *testing.T) {
-	t.Skip("Skipping until test can be ran reliably")
-
-	srv := NewTestServer(t, protoVersion2)
-	defer srv.Stop()
-
-	db, err := NewCluster(srv.Address).CreateSession()
-	if err != nil {
-		t.Fatalf("NewCluster: %v", err)
-	}
-	defer db.Close()
-
-	numConns := db.cfg.NumConns
-	count := db.cfg.NumStreams * numConns
-
-	wg := &sync.WaitGroup{}
-	wg.Add(count)
-	for i := 0; i < count; i++ {
-		go func(wg *sync.WaitGroup) {
-			wg.Done()
-			db.Query("kill").Exec()
-		}(wg)
-	}
-
-	wg.Wait()
-
-	time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
-	conns := db.pool.Size()
-
-	if conns != numConns {
-		t.Errorf("Expected to have %d connections but have %d", numConns, conns)
-	}
-}
-
 func TestStreams_Protocol1(t *testing.T) {
 	srv := NewTestServer(t, protoVersion1)
 	defer srv.Stop()
@@ -242,7 +208,7 @@ func TestStreams_Protocol1(t *testing.T) {
 	defer db.Close()
 
 	var wg sync.WaitGroup
-	for i := 1; i < db.cfg.NumStreams; i++ {
+	for i := 1; i < 128; i++ {
 		// here were just validating that if we send NumStream request we get
 		// a response for every stream and the lengths for the queries are set
 		// correctly.
@@ -257,33 +223,6 @@ func TestStreams_Protocol1(t *testing.T) {
 	wg.Wait()
 }
 
-func TestStreams_Protocol2(t *testing.T) {
-	srv := NewTestServer(t, protoVersion2)
-	defer srv.Stop()
-
-	// TODO: these are more like session tests and should instead operate
-	// on a single Conn
-	cluster := NewCluster(srv.Address)
-	cluster.NumConns = 1
-	cluster.ProtoVersion = 2
-
-	db, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer db.Close()
-
-	for i := 1; i < db.cfg.NumStreams; i++ {
-		// the test server processes each conn synchronously
-		// here were just validating that if we send NumStream request we get
-		// a response for every stream and the lengths for the queries are set
-		// correctly.
-		if err = db.Query("void").Exec(); err != nil {
-			t.Fatal(err)
-		}
-	}
-}
-
 func TestStreams_Protocol3(t *testing.T) {
 	srv := NewTestServer(t, protoVersion3)
 	defer srv.Stop()
@@ -300,7 +239,7 @@ func TestStreams_Protocol3(t *testing.T) {
 	}
 	defer db.Close()
 
-	for i := 1; i < db.cfg.NumStreams; i++ {
+	for i := 1; i < 32768; i++ {
 		// the test server processes each conn synchronously
 		// here were just validating that if we send NumStream request we get
 		// a response for every stream and the lengths for the queries are set
@@ -472,6 +411,10 @@ func TestQueryTimeout(t *testing.T) {
 }
 
 func TestQueryTimeoutReuseStream(t *testing.T) {
+	t.Skip("no longer tests anything")
+	// TODO(zariel): move this to conn test, we really just want to check what
+	// happens when a conn is
+
 	srv := NewTestServer(t, defaultProto)
 	defer srv.Stop()
 
@@ -480,7 +423,6 @@ func TestQueryTimeoutReuseStream(t *testing.T) {
 	// timely manner.
 	cluster.Timeout = 1 * time.Millisecond
 	cluster.NumConns = 1
-	cluster.NumStreams = 1
 
 	db, err := cluster.CreateSession()
 	if err != nil {
@@ -505,7 +447,6 @@ func TestQueryTimeoutClose(t *testing.T) {
 	// timely manner.
 	cluster.Timeout = 1000 * time.Millisecond
 	cluster.NumConns = 1
-	cluster.NumStreams = 1
 
 	db, err := cluster.CreateSession()
 	if err != nil {
@@ -532,54 +473,6 @@ func TestQueryTimeoutClose(t *testing.T) {
 	}
 }
 
-func TestExecPanic(t *testing.T) {
-	t.Skip("test can cause unrelated failures, skipping until it can be fixed.")
-	srv := NewTestServer(t, defaultProto)
-	defer srv.Stop()
-
-	cluster := NewCluster(srv.Address)
-	// Set the timeout arbitrarily low so that the query hits the timeout in a
-	// timely manner.
-	cluster.Timeout = 5 * time.Millisecond
-	cluster.NumConns = 1
-	// cluster.NumStreams = 1
-
-	db, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer db.Close()
-
-	streams := db.cfg.NumStreams
-
-	wg := &sync.WaitGroup{}
-	wg.Add(streams)
-	for i := 0; i < streams; i++ {
-		go func() {
-			defer wg.Done()
-			q := db.Query("void")
-			for {
-				if err := q.Exec(); err != nil {
-					return
-				}
-			}
-		}()
-	}
-
-	wg.Add(1)
-
-	go func() {
-		defer wg.Done()
-		for i := 0; i < int(TimeoutLimit); i++ {
-			db.Query("timeout").Exec()
-		}
-	}()
-
-	wg.Wait()
-
-	time.Sleep(500 * time.Millisecond)
-}
-
 func NewTestServer(t testing.TB, protocol uint8) *TestServer {
 	laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
 	if err != nil {

+ 0 - 1
connectionpool.go

@@ -97,7 +97,6 @@ func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
 			ProtoVersion:  cfg.ProtoVersion,
 			CQLVersion:    cfg.CQLVersion,
 			Timeout:       cfg.Timeout,
-			NumStreams:    cfg.NumStreams,
 			Compressor:    cfg.Compressor,
 			Authenticator: cfg.Authenticator,
 			Keepalive:     cfg.SocketKeepalive,

+ 0 - 9
session.go

@@ -54,15 +54,6 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		return nil, ErrNoHosts
 	}
 
-	maxStreams := 128
-	if cfg.ProtoVersion > protoVersion2 {
-		maxStreams = 32768
-	}
-
-	if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
-		cfg.NumStreams = maxStreams
-	}
-
 	//Adjust the size of the prepared statements cache to match the latest configuration
 	stmtsLRU.Lock()
 	initStmtsLRU(cfg.MaxPreparedStmts)

+ 0 - 2
stress_test.go

@@ -13,7 +13,6 @@ func BenchmarkConnStress(b *testing.B) {
 
 	cluster := createCluster()
 	cluster.NumConns = 1
-	cluster.NumStreams = workers
 	session := createSessionFromCluster(cluster, b)
 	defer session.Close()
 
@@ -43,7 +42,6 @@ func BenchmarkConnRoutingKey(b *testing.B) {
 
 	cluster := createCluster()
 	cluster.NumConns = 1
-	cluster.NumStreams = workers
 	cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
 	session := createSessionFromCluster(cluster, b)
 	defer session.Close()

+ 1 - 1
uuid_test.go

@@ -170,7 +170,7 @@ func TestTimeUUID(t *testing.T) {
 
 		ts := uuid.Timestamp()
 		if ts < timestamp {
-			t.Errorf("timestamps must grow")
+			t.Errorf("timestamps must grow: timestamp=%v ts=%v", timestamp, ts)
 		}
 		timestamp = ts
 	}